use anyhow::Result;
use rig::{
client::{CompletionClient, ProviderClient},
completion::{Prompt, ToolDefinition},
providers::{anthropic, gemini, openai},
tool::Tool,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use crate::agents::debug as agent_debug;
use crate::agents::provider::{
CompletionProfile, apply_completion_params, provider_from_name, resolve_api_key,
};
use crate::providers::Provider;
const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ParallelAnalyzeArgs {
pub tasks: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SubagentResult {
pub task: String,
pub result: String,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ParallelAnalyzeResult {
pub results: Vec<SubagentResult>,
pub successful: usize,
pub failed: usize,
pub execution_time_ms: u64,
}
#[derive(Clone)]
enum SubagentRunner {
OpenAI {
client: openai::Client,
model: String,
additional_params: HashMap<String, String>,
},
Anthropic {
client: anthropic::Client,
model: String,
additional_params: HashMap<String, String>,
},
Gemini {
client: gemini::Client,
model: String,
additional_params: HashMap<String, String>,
},
}
impl SubagentRunner {
fn new(
provider: &str,
model: &str,
api_key: Option<&str>,
additional_params: HashMap<String, String>,
) -> Result<Self> {
match provider {
"openai" => {
let client = Self::resolve_openai_client(api_key)?;
Ok(Self::OpenAI {
client,
model: model.to_string(),
additional_params,
})
}
"anthropic" => {
let client = Self::resolve_anthropic_client(api_key)?;
Ok(Self::Anthropic {
client,
model: model.to_string(),
additional_params,
})
}
"google" | "gemini" => {
let client = Self::resolve_gemini_client(api_key)?;
Ok(Self::Gemini {
client,
model: model.to_string(),
additional_params,
})
}
_ => Err(anyhow::anyhow!(
"Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
provider
)),
}
}
fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
match resolved_key {
Some(key) => openai::Client::new(&key)
.map_err(|_| {
anyhow::anyhow!(
"Failed to create OpenAI client: authentication or configuration error"
)
}),
None => Ok(openai::Client::from_env()),
}
}
fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
match resolved_key {
Some(key) => anthropic::Client::new(&key)
.map_err(|_| {
anyhow::anyhow!(
"Failed to create Anthropic client: authentication or configuration error"
)
}),
None => Ok(anthropic::Client::from_env()),
}
}
fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
match resolved_key {
Some(key) => gemini::Client::new(&key)
.map_err(|_| {
anyhow::anyhow!(
"Failed to create Gemini client: authentication or configuration error"
)
}),
None => Ok(gemini::Client::from_env()),
}
}
async fn run_task(&self, task: &str) -> SubagentResult {
let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
task thoroughly and return a focused summary.\n\n\
Guidelines:\n\
- Use the available tools to gather necessary information\n\
- Focus only on what's asked\n\
- Return a clear, structured summary\n\
- Be concise but comprehensive";
let result = match self {
Self::OpenAI {
client,
model,
additional_params,
} => {
let builder = client.agent(model).preamble(preamble);
let builder = apply_completion_params(
builder,
Provider::OpenAI,
model,
4096,
Some(additional_params),
CompletionProfile::Subagent,
);
let agent = crate::attach_core_tools!(builder).build();
agent.prompt(task).await
}
Self::Anthropic {
client,
model,
additional_params,
} => {
let builder = client.agent(model).preamble(preamble);
let builder = apply_completion_params(
builder,
Provider::Anthropic,
model,
4096,
Some(additional_params),
CompletionProfile::Subagent,
);
let agent = crate::attach_core_tools!(builder).build();
agent.prompt(task).await
}
Self::Gemini {
client,
model,
additional_params,
} => {
let builder = client.agent(model).preamble(preamble);
let builder = apply_completion_params(
builder,
Provider::Google,
model,
4096,
Some(additional_params),
CompletionProfile::Subagent,
);
let agent = crate::attach_core_tools!(builder).build();
agent.prompt(task).await
}
};
match result {
Ok(response) => SubagentResult {
task: task.to_string(),
result: response,
success: true,
error: None,
},
Err(e) => SubagentResult {
task: task.to_string(),
result: String::new(),
success: false,
error: Some(e.to_string()),
},
}
}
}
pub struct ParallelAnalyze {
runner: SubagentRunner,
model: String,
timeout_secs: u64,
}
impl ParallelAnalyze {
pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
Self::with_timeout(
provider,
model,
DEFAULT_SUBAGENT_TIMEOUT_SECS,
api_key,
None,
)
}
pub fn with_timeout(
provider: &str,
model: &str,
timeout_secs: u64,
api_key: Option<&str>,
additional_params: Option<HashMap<String, String>>,
) -> Result<Self> {
let provider_name = provider_from_name(provider)?;
let runner = SubagentRunner::new(
provider_name.name(),
model,
api_key,
additional_params.unwrap_or_default(),
)
.map_err(|e| {
anyhow::anyhow!(
"Failed to create {} runner: {}. Check API key and network connectivity.",
provider,
e
)
})?;
Ok(Self {
runner,
model: model.to_string(),
timeout_secs,
})
}
}
crate::define_tool_error!(ParallelAnalyzeError);
impl Tool for ParallelAnalyze {
const NAME: &'static str = "parallel_analyze";
type Error = ParallelAnalyzeError;
type Args = ParallelAnalyzeArgs;
type Output = ParallelAnalyzeResult;
async fn definition(&self, _prompt: String) -> ToolDefinition {
ToolDefinition {
name: Self::NAME.to_string(),
description: "Run multiple analysis tasks in parallel using independent subagents. \
Each subagent has its own context window, preventing overflow when \
analyzing large changesets. Use this when you have multiple independent \
analysis tasks that can run concurrently.\n\n\
Best for:\n\
- Analyzing different directories/modules separately\n\
- Processing many commits in batches\n\
- Running different types of analysis (security, performance, style) in parallel\n\n\
Each task should be a focused prompt. Results are aggregated and returned."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": { "type": "string" },
"description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
"minItems": 1,
"maxItems": 10
}
},
"required": ["tasks"]
}),
}
}
#[allow(clippy::cognitive_complexity)]
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
use std::time::Instant;
let start = Instant::now();
let tasks = args.tasks;
let num_tasks = tasks.len();
agent_debug::debug_context_management(
"ParallelAnalyze",
&format!(
"Spawning {} subagents (fast model: {})",
num_tasks, self.model
),
);
let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
Arc::new(Mutex::new(vec![None; num_tasks]));
let mut handles = Vec::new();
let timeout = Duration::from_secs(self.timeout_secs);
for (index, task) in tasks.into_iter().enumerate() {
let runner = self.runner.clone();
let results = Arc::clone(&results);
let task_timeout = timeout;
let timeout_secs = self.timeout_secs;
let handle = tokio::spawn(async move {
let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
{
Ok(result) => result,
Err(_) => SubagentResult {
task: task.clone(),
result: String::new(),
success: false,
error: Some(format!("Task timed out after {} seconds", timeout_secs)),
},
};
let mut guard = results.lock().await;
guard[index] = Some(result);
});
handles.push(handle);
}
for handle in handles {
if let Err(e) = handle.await {
agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
}
}
#[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
.map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
.into_inner()
.into_iter()
.enumerate()
.map(|(i, opt)| {
opt.unwrap_or_else(|| SubagentResult {
task: format!("Task {}", i),
result: String::new(),
success: false,
error: Some("Task did not complete".to_string()),
})
})
.collect();
let successful = final_results.iter().filter(|r| r.success).count();
let failed = final_results.iter().filter(|r| !r.success).count();
agent_debug::debug_context_management(
"ParallelAnalyze",
&format!(
"{}/{} successful in {}ms",
successful, num_tasks, execution_time_ms
),
);
Ok(ParallelAnalyzeResult {
results: final_results,
successful,
failed,
execution_time_ms,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_analyze_args_schema() {
let schema = schemars::schema_for!(ParallelAnalyzeArgs);
let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
assert!(json.contains("tasks"));
}
}