git_iris/agents/tools/
parallel_analyze.rs

1//! Parallel Analysis Tool
2//!
3//! Enables Iris to spawn multiple independent subagents that analyze different
4//! portions of a codebase concurrently. This prevents context overflow when
5//! dealing with large changesets by distributing work across separate context windows.
6
7use anyhow::Result;
8use rig::{
9    client::{CompletionClient, ProviderClient},
10    completion::{Prompt, ToolDefinition},
11    providers::{anthropic, openai},
12    tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Mutex;
20
21use crate::agents::debug as agent_debug;
22
23/// Default timeout for individual subagent tasks (2 minutes)
24const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
25
26/// Arguments for parallel analysis
27#[derive(Debug, Deserialize, JsonSchema)]
28pub struct ParallelAnalyzeArgs {
29    /// List of analysis tasks to run in parallel.
30    /// Each task should be a focused prompt describing what to analyze.
31    /// Example: `["Analyze security changes in auth/", "Review performance in db/"]`
32    pub tasks: Vec<String>,
33}
34
35/// Result from a single subagent analysis
36#[derive(Debug, Serialize, Deserialize, Clone)]
37pub struct SubagentResult {
38    /// The original task prompt
39    pub task: String,
40    /// The analysis result
41    pub result: String,
42    /// Whether the analysis succeeded
43    pub success: bool,
44    /// Error message if failed
45    pub error: Option<String>,
46}
47
48/// Aggregated results from all parallel analyses
49#[derive(Debug, Serialize, Deserialize)]
50pub struct ParallelAnalyzeResult {
51    /// Results from each subagent
52    pub results: Vec<SubagentResult>,
53    /// Number of successful analyses
54    pub successful: usize,
55    /// Number of failed analyses
56    pub failed: usize,
57    /// Total execution time in milliseconds
58    pub execution_time_ms: u64,
59}
60
61/// Provider-specific subagent runner
62#[derive(Clone)]
63enum SubagentRunner {
64    OpenAI {
65        client: openai::Client,
66        model: String,
67    },
68    Anthropic {
69        client: anthropic::Client,
70        model: String,
71    },
72}
73
74impl SubagentRunner {
75    fn new(provider: &str, model: &str) -> Result<Self> {
76        match provider {
77            "openai" => {
78                let client = openai::Client::from_env();
79                Ok(Self::OpenAI {
80                    client,
81                    model: model.to_string(),
82                })
83            }
84            "anthropic" => {
85                let client = anthropic::Client::from_env();
86                Ok(Self::Anthropic {
87                    client,
88                    model: model.to_string(),
89                })
90            }
91            _ => Err(anyhow::anyhow!(
92                "Unsupported provider for parallel analysis: {}",
93                provider
94            )),
95        }
96    }
97
98    async fn run_task(&self, task: &str) -> SubagentResult {
99        let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
100            task thoroughly and return a focused summary.\n\n\
101            Guidelines:\n\
102            - Use the available tools to gather necessary information\n\
103            - Focus only on what's asked\n\
104            - Return a clear, structured summary\n\
105            - Be concise but comprehensive";
106
107        // Use shared tool registry for consistent tool attachment
108        let result = match self {
109            Self::OpenAI { client, model } => {
110                let builder = client.agent(model).preamble(preamble).max_tokens(4096);
111                let agent = crate::attach_core_tools!(builder).build();
112                agent.prompt(task).await
113            }
114            Self::Anthropic { client, model } => {
115                let builder = client.agent(model).preamble(preamble).max_tokens(4096);
116                let agent = crate::attach_core_tools!(builder).build();
117                agent.prompt(task).await
118            }
119        };
120
121        match result {
122            Ok(response) => SubagentResult {
123                task: task.to_string(),
124                result: response,
125                success: true,
126                error: None,
127            },
128            Err(e) => SubagentResult {
129                task: task.to_string(),
130                result: String::new(),
131                success: false,
132                error: Some(e.to_string()),
133            },
134        }
135    }
136}
137
138/// Parallel analysis tool
139/// Spawns multiple subagents to analyze different aspects concurrently
140pub struct ParallelAnalyze {
141    runner: SubagentRunner,
142    model: String,
143    /// Timeout in seconds for each subagent task
144    timeout_secs: u64,
145}
146
147impl ParallelAnalyze {
148    /// Create a new parallel analyzer with default timeout
149    pub fn new(provider: &str, model: &str) -> Result<Self> {
150        Self::with_timeout(provider, model, DEFAULT_SUBAGENT_TIMEOUT_SECS)
151    }
152
153    /// Create a new parallel analyzer with custom timeout
154    pub fn with_timeout(provider: &str, model: &str, timeout_secs: u64) -> Result<Self> {
155        // Try the requested provider first, then fall back to openai
156        let runner = SubagentRunner::new(provider, model).or_else(|e| {
157            tracing::warn!(
158                "Failed to create {} runner: {}, falling back to openai",
159                provider,
160                e
161            );
162            SubagentRunner::new("openai", "gpt-4o")
163        })?;
164
165        Ok(Self {
166            runner,
167            model: model.to_string(),
168            timeout_secs,
169        })
170    }
171}
172
173// Use standard tool error macro for consistency
174crate::define_tool_error!(ParallelAnalyzeError);
175
176impl Tool for ParallelAnalyze {
177    const NAME: &'static str = "parallel_analyze";
178    type Error = ParallelAnalyzeError;
179    type Args = ParallelAnalyzeArgs;
180    type Output = ParallelAnalyzeResult;
181
182    async fn definition(&self, _prompt: String) -> ToolDefinition {
183        ToolDefinition {
184            name: Self::NAME.to_string(),
185            description: "Run multiple analysis tasks in parallel using independent subagents. \
186                         Each subagent has its own context window, preventing overflow when \
187                         analyzing large changesets. Use this when you have multiple independent \
188                         analysis tasks that can run concurrently.\n\n\
189                         Best for:\n\
190                         - Analyzing different directories/modules separately\n\
191                         - Processing many commits in batches\n\
192                         - Running different types of analysis (security, performance, style) in parallel\n\n\
193                         Each task should be a focused prompt. Results are aggregated and returned."
194                .to_string(),
195            parameters: json!({
196                "type": "object",
197                "properties": {
198                    "tasks": {
199                        "type": "array",
200                        "items": { "type": "string" },
201                        "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
202                        "minItems": 1,
203                        "maxItems": 10
204                    }
205                },
206                "required": ["tasks"]
207            }),
208        }
209    }
210
211    #[allow(clippy::cognitive_complexity)]
212    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
213        use std::time::Instant;
214
215        let start = Instant::now();
216        let tasks = args.tasks;
217        let num_tasks = tasks.len();
218
219        agent_debug::debug_context_management(
220            "ParallelAnalyze",
221            &format!(
222                "Spawning {} subagents (fast model: {})",
223                num_tasks, self.model
224            ),
225        );
226
227        // Pre-allocate results vector to preserve task ordering
228        let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
229            Arc::new(Mutex::new(vec![None; num_tasks]));
230
231        // Spawn all tasks as parallel tokio tasks, tracking index for ordering
232        let mut handles = Vec::new();
233        let timeout = Duration::from_secs(self.timeout_secs);
234        for (index, task) in tasks.into_iter().enumerate() {
235            let runner = self.runner.clone();
236            let results = Arc::clone(&results);
237            let task_timeout = timeout;
238            let timeout_secs = self.timeout_secs;
239
240            let handle = tokio::spawn(async move {
241                // Wrap task execution in timeout to prevent hanging
242                let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
243                {
244                    Ok(result) => result,
245                    Err(_) => SubagentResult {
246                        task: task.clone(),
247                        result: String::new(),
248                        success: false,
249                        error: Some(format!("Task timed out after {} seconds", timeout_secs)),
250                    },
251                };
252
253                // Store result at original index to preserve ordering
254                let mut guard = results.lock().await;
255                guard[index] = Some(result);
256            });
257
258            handles.push(handle);
259        }
260
261        // Wait for all tasks to complete
262        for handle in handles {
263            if let Err(e) = handle.await {
264                agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
265            }
266        }
267
268        #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
269        let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
270
271        // Extract results, preserving original task order
272        let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
273            .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
274            .into_inner()
275            .into_iter()
276            .enumerate()
277            .map(|(i, opt)| {
278                opt.unwrap_or_else(|| SubagentResult {
279                    task: format!("Task {}", i),
280                    result: String::new(),
281                    success: false,
282                    error: Some("Task did not complete".to_string()),
283                })
284            })
285            .collect();
286
287        let successful = final_results.iter().filter(|r| r.success).count();
288        let failed = final_results.iter().filter(|r| !r.success).count();
289
290        agent_debug::debug_context_management(
291            "ParallelAnalyze",
292            &format!(
293                "{}/{} successful in {}ms",
294                successful, num_tasks, execution_time_ms
295            ),
296        );
297
298        Ok(ParallelAnalyzeResult {
299            results: final_results,
300            successful,
301            failed,
302            execution_time_ms,
303        })
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_parallel_analyze_args_schema() {
313        let schema = schemars::schema_for!(ParallelAnalyzeArgs);
314        let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
315        assert!(json.contains("tasks"));
316    }
317}