Skip to main content

git_iris/agents/tools/
parallel_analyze.rs

1//! Parallel Analysis Tool
2//!
3//! Enables Iris to spawn multiple independent subagents that analyze different
4//! portions of a codebase concurrently. This prevents context overflow when
5//! dealing with large changesets by distributing work across separate context windows.
6
7use anyhow::Result;
8use rig::{
9    client::{CompletionClient, ProviderClient},
10    completion::{Prompt, ToolDefinition},
11    providers::{anthropic, gemini, openai},
12    tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Mutex;
20
21use crate::agents::debug as agent_debug;
22use crate::agents::provider::resolve_api_key;
23use crate::providers::Provider;
24
25/// Default timeout for individual subagent tasks (2 minutes)
26const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
27
28/// Arguments for parallel analysis
29#[derive(Debug, Deserialize, JsonSchema)]
30pub struct ParallelAnalyzeArgs {
31    /// List of analysis tasks to run in parallel.
32    /// Each task should be a focused prompt describing what to analyze.
33    /// Example: `["Analyze security changes in auth/", "Review performance in db/"]`
34    pub tasks: Vec<String>,
35}
36
37/// Result from a single subagent analysis
38#[derive(Debug, Serialize, Deserialize, Clone)]
39pub struct SubagentResult {
40    /// The original task prompt
41    pub task: String,
42    /// The analysis result
43    pub result: String,
44    /// Whether the analysis succeeded
45    pub success: bool,
46    /// Error message if failed
47    pub error: Option<String>,
48}
49
50/// Aggregated results from all parallel analyses
51#[derive(Debug, Serialize, Deserialize)]
52pub struct ParallelAnalyzeResult {
53    /// Results from each subagent
54    pub results: Vec<SubagentResult>,
55    /// Number of successful analyses
56    pub successful: usize,
57    /// Number of failed analyses
58    pub failed: usize,
59    /// Total execution time in milliseconds
60    pub execution_time_ms: u64,
61}
62
63/// Provider-specific subagent runner
64#[derive(Clone)]
65enum SubagentRunner {
66    OpenAI {
67        client: openai::Client,
68        model: String,
69    },
70    Anthropic {
71        client: anthropic::Client,
72        model: String,
73    },
74    Gemini {
75        client: gemini::Client,
76        model: String,
77    },
78}
79
80impl SubagentRunner {
81    fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
82        match provider {
83            "openai" => {
84                let client = Self::resolve_openai_client(api_key)?;
85                Ok(Self::OpenAI {
86                    client,
87                    model: model.to_string(),
88                })
89            }
90            "anthropic" => {
91                let client = Self::resolve_anthropic_client(api_key)?;
92                Ok(Self::Anthropic {
93                    client,
94                    model: model.to_string(),
95                })
96            }
97            "google" | "gemini" => {
98                let client = Self::resolve_gemini_client(api_key)?;
99                Ok(Self::Gemini {
100                    client,
101                    model: model.to_string(),
102                })
103            }
104            _ => Err(anyhow::anyhow!(
105                "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
106                provider
107            )),
108        }
109    }
110
111    /// Create `OpenAI` client using shared resolution logic
112    ///
113    /// Uses `resolve_api_key` from provider module to maintain consistent
114    /// resolution order: config → env var → client default
115    fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
116        let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
117        match resolved_key {
118            Some(key) => openai::Client::new(&key)
119                // Sanitize error to avoid exposing key material
120                .map_err(|_| {
121                    anyhow::anyhow!(
122                        "Failed to create OpenAI client: authentication or configuration error"
123                    )
124                }),
125            None => Ok(openai::Client::from_env()),
126        }
127    }
128
129    /// Create `Anthropic` client using shared resolution logic
130    ///
131    /// Uses `resolve_api_key` from provider module to maintain consistent
132    /// resolution order: config → env var → client default
133    fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
134        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
135        match resolved_key {
136            Some(key) => anthropic::Client::new(&key)
137                // Sanitize error to avoid exposing key material
138                .map_err(|_| {
139                    anyhow::anyhow!(
140                        "Failed to create Anthropic client: authentication or configuration error"
141                    )
142                }),
143            None => Ok(anthropic::Client::from_env()),
144        }
145    }
146
147    /// Create `Gemini` client using shared resolution logic
148    ///
149    /// Uses `resolve_api_key` from provider module to maintain consistent
150    /// resolution order: config → env var → client default
151    fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
152        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
153        match resolved_key {
154            Some(key) => gemini::Client::new(&key)
155                // Sanitize error to avoid exposing key material
156                .map_err(|_| {
157                    anyhow::anyhow!(
158                        "Failed to create Gemini client: authentication or configuration error"
159                    )
160                }),
161            None => Ok(gemini::Client::from_env()),
162        }
163    }
164
165    async fn run_task(&self, task: &str) -> SubagentResult {
166        let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
167            task thoroughly and return a focused summary.\n\n\
168            Guidelines:\n\
169            - Use the available tools to gather necessary information\n\
170            - Focus only on what's asked\n\
171            - Return a clear, structured summary\n\
172            - Be concise but comprehensive";
173
174        // Use shared tool registry for consistent tool attachment
175        let result = match self {
176            Self::OpenAI { client, model } => {
177                let builder = client.agent(model).preamble(preamble).max_tokens(4096);
178                let agent = crate::attach_core_tools!(builder).build();
179                agent.prompt(task).await
180            }
181            Self::Anthropic { client, model } => {
182                let builder = client.agent(model).preamble(preamble).max_tokens(4096);
183                let agent = crate::attach_core_tools!(builder).build();
184                agent.prompt(task).await
185            }
186            Self::Gemini { client, model } => {
187                let builder = client.agent(model).preamble(preamble).max_tokens(4096);
188                let agent = crate::attach_core_tools!(builder).build();
189                agent.prompt(task).await
190            }
191        };
192
193        match result {
194            Ok(response) => SubagentResult {
195                task: task.to_string(),
196                result: response,
197                success: true,
198                error: None,
199            },
200            Err(e) => SubagentResult {
201                task: task.to_string(),
202                result: String::new(),
203                success: false,
204                error: Some(e.to_string()),
205            },
206        }
207    }
208}
209
210/// Parallel analysis tool
211/// Spawns multiple subagents to analyze different aspects concurrently
212pub struct ParallelAnalyze {
213    runner: SubagentRunner,
214    model: String,
215    /// Timeout in seconds for each subagent task
216    timeout_secs: u64,
217}
218
219impl ParallelAnalyze {
220    /// Create a new parallel analyzer with default timeout
221    pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
222        Self::with_timeout(provider, model, DEFAULT_SUBAGENT_TIMEOUT_SECS, api_key)
223    }
224
225    /// Create a new parallel analyzer with custom timeout
226    pub fn with_timeout(
227        provider: &str,
228        model: &str,
229        timeout_secs: u64,
230        api_key: Option<&str>,
231    ) -> Result<Self> {
232        // Create runner for the requested provider - no silent fallback
233        // If the user configures Anthropic, they should get Anthropic or a clear error
234        let runner = SubagentRunner::new(provider, model, api_key).map_err(|e| {
235            anyhow::anyhow!(
236                "Failed to create {} runner: {}. Check API key and network connectivity.",
237                provider,
238                e
239            )
240        })?;
241
242        Ok(Self {
243            runner,
244            model: model.to_string(),
245            timeout_secs,
246        })
247    }
248}
249
250// Use standard tool error macro for consistency
251crate::define_tool_error!(ParallelAnalyzeError);
252
253impl Tool for ParallelAnalyze {
254    const NAME: &'static str = "parallel_analyze";
255    type Error = ParallelAnalyzeError;
256    type Args = ParallelAnalyzeArgs;
257    type Output = ParallelAnalyzeResult;
258
259    async fn definition(&self, _prompt: String) -> ToolDefinition {
260        ToolDefinition {
261            name: Self::NAME.to_string(),
262            description: "Run multiple analysis tasks in parallel using independent subagents. \
263                         Each subagent has its own context window, preventing overflow when \
264                         analyzing large changesets. Use this when you have multiple independent \
265                         analysis tasks that can run concurrently.\n\n\
266                         Best for:\n\
267                         - Analyzing different directories/modules separately\n\
268                         - Processing many commits in batches\n\
269                         - Running different types of analysis (security, performance, style) in parallel\n\n\
270                         Each task should be a focused prompt. Results are aggregated and returned."
271                .to_string(),
272            parameters: json!({
273                "type": "object",
274                "properties": {
275                    "tasks": {
276                        "type": "array",
277                        "items": { "type": "string" },
278                        "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
279                        "minItems": 1,
280                        "maxItems": 10
281                    }
282                },
283                "required": ["tasks"]
284            }),
285        }
286    }
287
288    #[allow(clippy::cognitive_complexity)]
289    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
290        use std::time::Instant;
291
292        let start = Instant::now();
293        let tasks = args.tasks;
294        let num_tasks = tasks.len();
295
296        agent_debug::debug_context_management(
297            "ParallelAnalyze",
298            &format!(
299                "Spawning {} subagents (fast model: {})",
300                num_tasks, self.model
301            ),
302        );
303
304        // Pre-allocate results vector to preserve task ordering
305        let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
306            Arc::new(Mutex::new(vec![None; num_tasks]));
307
308        // Spawn all tasks as parallel tokio tasks, tracking index for ordering
309        let mut handles = Vec::new();
310        let timeout = Duration::from_secs(self.timeout_secs);
311        for (index, task) in tasks.into_iter().enumerate() {
312            let runner = self.runner.clone();
313            let results = Arc::clone(&results);
314            let task_timeout = timeout;
315            let timeout_secs = self.timeout_secs;
316
317            let handle = tokio::spawn(async move {
318                // Wrap task execution in timeout to prevent hanging
319                let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
320                {
321                    Ok(result) => result,
322                    Err(_) => SubagentResult {
323                        task: task.clone(),
324                        result: String::new(),
325                        success: false,
326                        error: Some(format!("Task timed out after {} seconds", timeout_secs)),
327                    },
328                };
329
330                // Store result at original index to preserve ordering
331                let mut guard = results.lock().await;
332                guard[index] = Some(result);
333            });
334
335            handles.push(handle);
336        }
337
338        // Wait for all tasks to complete
339        for handle in handles {
340            if let Err(e) = handle.await {
341                agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
342            }
343        }
344
345        #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
346        let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
347
348        // Extract results, preserving original task order
349        let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
350            .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
351            .into_inner()
352            .into_iter()
353            .enumerate()
354            .map(|(i, opt)| {
355                opt.unwrap_or_else(|| SubagentResult {
356                    task: format!("Task {}", i),
357                    result: String::new(),
358                    success: false,
359                    error: Some("Task did not complete".to_string()),
360                })
361            })
362            .collect();
363
364        let successful = final_results.iter().filter(|r| r.success).count();
365        let failed = final_results.iter().filter(|r| !r.success).count();
366
367        agent_debug::debug_context_management(
368            "ParallelAnalyze",
369            &format!(
370                "{}/{} successful in {}ms",
371                successful, num_tasks, execution_time_ms
372            ),
373        );
374
375        Ok(ParallelAnalyzeResult {
376            results: final_results,
377            successful,
378            failed,
379            execution_time_ms,
380        })
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn test_parallel_analyze_args_schema() {
390        let schema = schemars::schema_for!(ParallelAnalyzeArgs);
391        let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
392        assert!(json.contains("tasks"));
393    }
394}