Skip to main content

git_iris/agents/tools/
parallel_analyze.rs

1//! Parallel Analysis Tool
2//!
3//! Enables Iris to spawn multiple independent subagents that analyze different
4//! portions of a codebase concurrently. This prevents context overflow when
5//! dealing with large changesets by distributing work across separate context windows.
6
7use anyhow::Result;
8use rig::{
9    client::{CompletionClient, ProviderClient},
10    completion::{Prompt, ToolDefinition},
11    providers::{anthropic, gemini, openai},
12    tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24    CompletionProfile, apply_completion_params, provider_from_name, resolve_api_key,
25};
26use crate::providers::Provider;
27
28/// Default timeout for individual subagent tasks (2 minutes)
29const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
30
31/// Arguments for parallel analysis
32#[derive(Debug, Deserialize, JsonSchema)]
33pub struct ParallelAnalyzeArgs {
34    /// List of analysis tasks to run in parallel.
35    /// Each task should be a focused prompt describing what to analyze.
36    /// Example: `["Analyze security changes in auth/", "Review performance in db/"]`
37    pub tasks: Vec<String>,
38}
39
40/// Result from a single subagent analysis
41#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct SubagentResult {
43    /// The original task prompt
44    pub task: String,
45    /// The analysis result
46    pub result: String,
47    /// Whether the analysis succeeded
48    pub success: bool,
49    /// Error message if failed
50    pub error: Option<String>,
51}
52
53/// Aggregated results from all parallel analyses
54#[derive(Debug, Serialize, Deserialize)]
55pub struct ParallelAnalyzeResult {
56    /// Results from each subagent
57    pub results: Vec<SubagentResult>,
58    /// Number of successful analyses
59    pub successful: usize,
60    /// Number of failed analyses
61    pub failed: usize,
62    /// Total execution time in milliseconds
63    pub execution_time_ms: u64,
64}
65
66/// Provider-specific subagent runner
67#[derive(Clone)]
68enum SubagentRunner {
69    OpenAI {
70        client: openai::Client,
71        model: String,
72        additional_params: HashMap<String, String>,
73    },
74    Anthropic {
75        client: anthropic::Client,
76        model: String,
77        additional_params: HashMap<String, String>,
78    },
79    Gemini {
80        client: gemini::Client,
81        model: String,
82        additional_params: HashMap<String, String>,
83    },
84}
85
86impl SubagentRunner {
87    fn new(
88        provider: &str,
89        model: &str,
90        api_key: Option<&str>,
91        additional_params: HashMap<String, String>,
92    ) -> Result<Self> {
93        match provider {
94            "openai" => {
95                let client = Self::resolve_openai_client(api_key)?;
96                Ok(Self::OpenAI {
97                    client,
98                    model: model.to_string(),
99                    additional_params,
100                })
101            }
102            "anthropic" => {
103                let client = Self::resolve_anthropic_client(api_key)?;
104                Ok(Self::Anthropic {
105                    client,
106                    model: model.to_string(),
107                    additional_params,
108                })
109            }
110            "google" | "gemini" => {
111                let client = Self::resolve_gemini_client(api_key)?;
112                Ok(Self::Gemini {
113                    client,
114                    model: model.to_string(),
115                    additional_params,
116                })
117            }
118            _ => Err(anyhow::anyhow!(
119                "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
120                provider
121            )),
122        }
123    }
124
125    /// Create `OpenAI` client using shared resolution logic
126    ///
127    /// Uses `resolve_api_key` from provider module to maintain consistent
128    /// resolution order: config → env var → client default
129    fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
130        let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
131        match resolved_key {
132            Some(key) => openai::Client::new(&key)
133                // Sanitize error to avoid exposing key material
134                .map_err(|_| {
135                    anyhow::anyhow!(
136                        "Failed to create OpenAI client: authentication or configuration error"
137                    )
138                }),
139            None => Ok(openai::Client::from_env()),
140        }
141    }
142
143    /// Create `Anthropic` client using shared resolution logic
144    ///
145    /// Uses `resolve_api_key` from provider module to maintain consistent
146    /// resolution order: config → env var → client default
147    fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
148        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
149        match resolved_key {
150            Some(key) => anthropic::Client::new(&key)
151                // Sanitize error to avoid exposing key material
152                .map_err(|_| {
153                    anyhow::anyhow!(
154                        "Failed to create Anthropic client: authentication or configuration error"
155                    )
156                }),
157            None => Ok(anthropic::Client::from_env()),
158        }
159    }
160
161    /// Create `Gemini` client using shared resolution logic
162    ///
163    /// Uses `resolve_api_key` from provider module to maintain consistent
164    /// resolution order: config → env var → client default
165    fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
166        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
167        match resolved_key {
168            Some(key) => gemini::Client::new(&key)
169                // Sanitize error to avoid exposing key material
170                .map_err(|_| {
171                    anyhow::anyhow!(
172                        "Failed to create Gemini client: authentication or configuration error"
173                    )
174                }),
175            None => Ok(gemini::Client::from_env()),
176        }
177    }
178
179    async fn run_task(&self, task: &str) -> SubagentResult {
180        let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
181            task thoroughly and return a focused summary.\n\n\
182            Guidelines:\n\
183            - Use the available tools to gather necessary information\n\
184            - Focus only on what's asked\n\
185            - Return a clear, structured summary\n\
186            - Be concise but comprehensive";
187
188        // Use shared tool registry for consistent tool attachment
189        let result = match self {
190            Self::OpenAI {
191                client,
192                model,
193                additional_params,
194            } => {
195                let builder = client.agent(model).preamble(preamble);
196                let builder = apply_completion_params(
197                    builder,
198                    Provider::OpenAI,
199                    model,
200                    4096,
201                    Some(additional_params),
202                    CompletionProfile::Subagent,
203                );
204                let agent = crate::attach_core_tools!(builder).build();
205                agent.prompt(task).await
206            }
207            Self::Anthropic {
208                client,
209                model,
210                additional_params,
211            } => {
212                let builder = client.agent(model).preamble(preamble);
213                let builder = apply_completion_params(
214                    builder,
215                    Provider::Anthropic,
216                    model,
217                    4096,
218                    Some(additional_params),
219                    CompletionProfile::Subagent,
220                );
221                let agent = crate::attach_core_tools!(builder).build();
222                agent.prompt(task).await
223            }
224            Self::Gemini {
225                client,
226                model,
227                additional_params,
228            } => {
229                let builder = client.agent(model).preamble(preamble);
230                let builder = apply_completion_params(
231                    builder,
232                    Provider::Google,
233                    model,
234                    4096,
235                    Some(additional_params),
236                    CompletionProfile::Subagent,
237                );
238                let agent = crate::attach_core_tools!(builder).build();
239                agent.prompt(task).await
240            }
241        };
242
243        match result {
244            Ok(response) => SubagentResult {
245                task: task.to_string(),
246                result: response,
247                success: true,
248                error: None,
249            },
250            Err(e) => SubagentResult {
251                task: task.to_string(),
252                result: String::new(),
253                success: false,
254                error: Some(e.to_string()),
255            },
256        }
257    }
258}
259
260/// Parallel analysis tool
261/// Spawns multiple subagents to analyze different aspects concurrently
262pub struct ParallelAnalyze {
263    runner: SubagentRunner,
264    model: String,
265    /// Timeout in seconds for each subagent task
266    timeout_secs: u64,
267}
268
269impl ParallelAnalyze {
270    /// Create a new parallel analyzer with default timeout
271    ///
272    /// # Errors
273    ///
274    /// Returns an error when the requested provider runner cannot be created.
275    pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
276        Self::with_timeout(
277            provider,
278            model,
279            DEFAULT_SUBAGENT_TIMEOUT_SECS,
280            api_key,
281            None,
282        )
283    }
284
285    /// Create a new parallel analyzer with custom timeout
286    ///
287    /// # Errors
288    ///
289    /// Returns an error when the requested provider runner cannot be created.
290    pub fn with_timeout(
291        provider: &str,
292        model: &str,
293        timeout_secs: u64,
294        api_key: Option<&str>,
295        additional_params: Option<HashMap<String, String>>,
296    ) -> Result<Self> {
297        let provider_name = provider_from_name(provider)?;
298        // Create runner for the requested provider - no silent fallback
299        // If the user configures Anthropic, they should get Anthropic or a clear error
300        let runner = SubagentRunner::new(
301            provider_name.name(),
302            model,
303            api_key,
304            additional_params.unwrap_or_default(),
305        )
306        .map_err(|e| {
307            anyhow::anyhow!(
308                "Failed to create {} runner: {}. Check API key and network connectivity.",
309                provider,
310                e
311            )
312        })?;
313
314        Ok(Self {
315            runner,
316            model: model.to_string(),
317            timeout_secs,
318        })
319    }
320}
321
322// Use standard tool error macro for consistency
323crate::define_tool_error!(ParallelAnalyzeError);
324
325impl Tool for ParallelAnalyze {
326    const NAME: &'static str = "parallel_analyze";
327    type Error = ParallelAnalyzeError;
328    type Args = ParallelAnalyzeArgs;
329    type Output = ParallelAnalyzeResult;
330
331    async fn definition(&self, _prompt: String) -> ToolDefinition {
332        ToolDefinition {
333            name: Self::NAME.to_string(),
334            description: "Run multiple analysis tasks in parallel using independent subagents. \
335                         Each subagent has its own context window, preventing overflow when \
336                         analyzing large changesets. Use this when you have multiple independent \
337                         analysis tasks that can run concurrently.\n\n\
338                         Best for:\n\
339                         - Analyzing different directories/modules separately\n\
340                         - Processing many commits in batches\n\
341                         - Running different types of analysis (security, performance, style) in parallel\n\n\
342                         Each task should be a focused prompt. Results are aggregated and returned."
343                .to_string(),
344            parameters: json!({
345                "type": "object",
346                "properties": {
347                    "tasks": {
348                        "type": "array",
349                        "items": { "type": "string" },
350                        "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
351                        "minItems": 1,
352                        "maxItems": 10
353                    }
354                },
355                "required": ["tasks"]
356            }),
357        }
358    }
359
360    #[allow(clippy::cognitive_complexity)]
361    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
362        use std::time::Instant;
363
364        let start = Instant::now();
365        let tasks = args.tasks;
366        let num_tasks = tasks.len();
367
368        agent_debug::debug_context_management(
369            "ParallelAnalyze",
370            &format!(
371                "Spawning {} subagents (fast model: {})",
372                num_tasks, self.model
373            ),
374        );
375
376        // Pre-allocate results vector to preserve task ordering
377        let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
378            Arc::new(Mutex::new(vec![None; num_tasks]));
379
380        // Spawn all tasks as parallel tokio tasks, tracking index for ordering
381        let mut handles = Vec::new();
382        let timeout = Duration::from_secs(self.timeout_secs);
383        for (index, task) in tasks.into_iter().enumerate() {
384            let runner = self.runner.clone();
385            let results = Arc::clone(&results);
386            let task_timeout = timeout;
387            let timeout_secs = self.timeout_secs;
388
389            let handle = tokio::spawn(async move {
390                // Wrap task execution in timeout to prevent hanging
391                let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
392                {
393                    Ok(result) => result,
394                    Err(_) => SubagentResult {
395                        task: task.clone(),
396                        result: String::new(),
397                        success: false,
398                        error: Some(format!("Task timed out after {} seconds", timeout_secs)),
399                    },
400                };
401
402                // Store result at original index to preserve ordering
403                let mut guard = results.lock().await;
404                guard[index] = Some(result);
405            });
406
407            handles.push(handle);
408        }
409
410        // Wait for all tasks to complete
411        for handle in handles {
412            if let Err(e) = handle.await {
413                agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
414            }
415        }
416
417        #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
418        let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
419
420        // Extract results, preserving original task order
421        let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
422            .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
423            .into_inner()
424            .into_iter()
425            .enumerate()
426            .map(|(i, opt)| {
427                opt.unwrap_or_else(|| SubagentResult {
428                    task: format!("Task {}", i),
429                    result: String::new(),
430                    success: false,
431                    error: Some("Task did not complete".to_string()),
432                })
433            })
434            .collect();
435
436        let successful = final_results.iter().filter(|r| r.success).count();
437        let failed = final_results.iter().filter(|r| !r.success).count();
438
439        agent_debug::debug_context_management(
440            "ParallelAnalyze",
441            &format!(
442                "{}/{} successful in {}ms",
443                successful, num_tasks, execution_time_ms
444            ),
445        );
446
447        Ok(ParallelAnalyzeResult {
448            results: final_results,
449            successful,
450            failed,
451            execution_time_ms,
452        })
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn test_parallel_analyze_args_schema() {
462        let schema = schemars::schema_for!(ParallelAnalyzeArgs);
463        let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
464        assert!(json.contains("tasks"));
465    }
466}