1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Mutex;
20
21use crate::agents::debug as agent_debug;
22
23const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
25
26#[derive(Debug, Deserialize, JsonSchema)]
28pub struct ParallelAnalyzeArgs {
29 pub tasks: Vec<String>,
33}
34
35#[derive(Debug, Serialize, Deserialize, Clone)]
37pub struct SubagentResult {
38 pub task: String,
40 pub result: String,
42 pub success: bool,
44 pub error: Option<String>,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
50pub struct ParallelAnalyzeResult {
51 pub results: Vec<SubagentResult>,
53 pub successful: usize,
55 pub failed: usize,
57 pub execution_time_ms: u64,
59}
60
61#[derive(Clone)]
63enum SubagentRunner {
64 OpenAI {
65 client: openai::Client,
66 model: String,
67 },
68 Anthropic {
69 client: anthropic::Client,
70 model: String,
71 },
72}
73
74impl SubagentRunner {
75 fn new(provider: &str, model: &str) -> Result<Self> {
76 match provider {
77 "openai" => {
78 let client = openai::Client::from_env();
79 Ok(Self::OpenAI {
80 client,
81 model: model.to_string(),
82 })
83 }
84 "anthropic" => {
85 let client = anthropic::Client::from_env();
86 Ok(Self::Anthropic {
87 client,
88 model: model.to_string(),
89 })
90 }
91 _ => Err(anyhow::anyhow!(
92 "Unsupported provider for parallel analysis: {}",
93 provider
94 )),
95 }
96 }
97
98 async fn run_task(&self, task: &str) -> SubagentResult {
99 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
100 task thoroughly and return a focused summary.\n\n\
101 Guidelines:\n\
102 - Use the available tools to gather necessary information\n\
103 - Focus only on what's asked\n\
104 - Return a clear, structured summary\n\
105 - Be concise but comprehensive";
106
107 let result = match self {
109 Self::OpenAI { client, model } => {
110 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
111 let agent = crate::attach_core_tools!(builder).build();
112 agent.prompt(task).await
113 }
114 Self::Anthropic { client, model } => {
115 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
116 let agent = crate::attach_core_tools!(builder).build();
117 agent.prompt(task).await
118 }
119 };
120
121 match result {
122 Ok(response) => SubagentResult {
123 task: task.to_string(),
124 result: response,
125 success: true,
126 error: None,
127 },
128 Err(e) => SubagentResult {
129 task: task.to_string(),
130 result: String::new(),
131 success: false,
132 error: Some(e.to_string()),
133 },
134 }
135 }
136}
137
138pub struct ParallelAnalyze {
141 runner: SubagentRunner,
142 model: String,
143 timeout_secs: u64,
145}
146
147impl ParallelAnalyze {
148 pub fn new(provider: &str, model: &str) -> Result<Self> {
150 Self::with_timeout(provider, model, DEFAULT_SUBAGENT_TIMEOUT_SECS)
151 }
152
153 pub fn with_timeout(provider: &str, model: &str, timeout_secs: u64) -> Result<Self> {
155 let runner = SubagentRunner::new(provider, model).or_else(|e| {
157 tracing::warn!(
158 "Failed to create {} runner: {}, falling back to openai",
159 provider,
160 e
161 );
162 SubagentRunner::new("openai", "gpt-4o")
163 })?;
164
165 Ok(Self {
166 runner,
167 model: model.to_string(),
168 timeout_secs,
169 })
170 }
171}
172
173crate::define_tool_error!(ParallelAnalyzeError);
175
176impl Tool for ParallelAnalyze {
177 const NAME: &'static str = "parallel_analyze";
178 type Error = ParallelAnalyzeError;
179 type Args = ParallelAnalyzeArgs;
180 type Output = ParallelAnalyzeResult;
181
182 async fn definition(&self, _prompt: String) -> ToolDefinition {
183 ToolDefinition {
184 name: Self::NAME.to_string(),
185 description: "Run multiple analysis tasks in parallel using independent subagents. \
186 Each subagent has its own context window, preventing overflow when \
187 analyzing large changesets. Use this when you have multiple independent \
188 analysis tasks that can run concurrently.\n\n\
189 Best for:\n\
190 - Analyzing different directories/modules separately\n\
191 - Processing many commits in batches\n\
192 - Running different types of analysis (security, performance, style) in parallel\n\n\
193 Each task should be a focused prompt. Results are aggregated and returned."
194 .to_string(),
195 parameters: json!({
196 "type": "object",
197 "properties": {
198 "tasks": {
199 "type": "array",
200 "items": { "type": "string" },
201 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
202 "minItems": 1,
203 "maxItems": 10
204 }
205 },
206 "required": ["tasks"]
207 }),
208 }
209 }
210
211 #[allow(clippy::cognitive_complexity)]
212 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
213 use std::time::Instant;
214
215 let start = Instant::now();
216 let tasks = args.tasks;
217 let num_tasks = tasks.len();
218
219 agent_debug::debug_context_management(
220 "ParallelAnalyze",
221 &format!(
222 "Spawning {} subagents (fast model: {})",
223 num_tasks, self.model
224 ),
225 );
226
227 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
229 Arc::new(Mutex::new(vec![None; num_tasks]));
230
231 let mut handles = Vec::new();
233 let timeout = Duration::from_secs(self.timeout_secs);
234 for (index, task) in tasks.into_iter().enumerate() {
235 let runner = self.runner.clone();
236 let results = Arc::clone(&results);
237 let task_timeout = timeout;
238 let timeout_secs = self.timeout_secs;
239
240 let handle = tokio::spawn(async move {
241 let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
243 {
244 Ok(result) => result,
245 Err(_) => SubagentResult {
246 task: task.clone(),
247 result: String::new(),
248 success: false,
249 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
250 },
251 };
252
253 let mut guard = results.lock().await;
255 guard[index] = Some(result);
256 });
257
258 handles.push(handle);
259 }
260
261 for handle in handles {
263 if let Err(e) = handle.await {
264 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
265 }
266 }
267
268 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
269 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
270
271 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
273 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
274 .into_inner()
275 .into_iter()
276 .enumerate()
277 .map(|(i, opt)| {
278 opt.unwrap_or_else(|| SubagentResult {
279 task: format!("Task {}", i),
280 result: String::new(),
281 success: false,
282 error: Some("Task did not complete".to_string()),
283 })
284 })
285 .collect();
286
287 let successful = final_results.iter().filter(|r| r.success).count();
288 let failed = final_results.iter().filter(|r| !r.success).count();
289
290 agent_debug::debug_context_management(
291 "ParallelAnalyze",
292 &format!(
293 "{}/{} successful in {}ms",
294 successful, num_tasks, execution_time_ms
295 ),
296 );
297
298 Ok(ParallelAnalyzeResult {
299 results: final_results,
300 successful,
301 failed,
302 execution_time_ms,
303 })
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_parallel_analyze_args_schema() {
313 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
314 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
315 assert!(json.contains("tasks"));
316 }
317}