use crate::extensions::types::ToolFn;
use futures::stream::{self, StreamExt};
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use crate::agents::AgentDef;
use crate::commands::spawn::terminal::{find_harness_binary, Harness};
pub struct ExtensionRunner {
registered_tools: HashMap<String, ToolFn>,
}
impl ExtensionRunner {
pub fn new() -> Self {
ExtensionRunner {
registered_tools: HashMap::new(),
}
}
pub fn register_tool(&mut self, name: String, tool_fn: ToolFn) {
self.registered_tools.insert(name, tool_fn);
}
pub fn execute_tool(&self, name: &str, args: &[Value]) -> Result<Value, ExtensionRunnerError> {
let tool_fn = self
.registered_tools
.get(name)
.ok_or_else(|| ExtensionRunnerError::ToolNotFound(name.to_string()))?;
tool_fn(args).map_err(ExtensionRunnerError::ExecutionError)
}
pub fn has_tool(&self, name: &str) -> bool {
self.registered_tools.contains_key(name)
}
pub fn list_tools(&self) -> Vec<String> {
self.registered_tools.keys().cloned().collect()
}
pub fn on_tool_call(
&self,
tool_name: &str,
arguments: Value,
) -> Result<ToolCallResult, ExtensionRunnerError> {
let args = match arguments {
Value::Array(arr) => arr,
Value::Object(_) => vec![arguments],
Value::Null => vec![],
other => vec![other],
};
let result = self.execute_tool(tool_name, &args)?;
Ok(ToolCallResult {
tool_name: tool_name.to_string(),
output: result,
success: true,
})
}
}
#[derive(Debug, Clone)]
pub struct ToolCallResult {
pub tool_name: String,
pub output: Value,
pub success: bool,
}
impl Default for ExtensionRunner {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, thiserror::Error)]
pub enum ExtensionRunnerError {
#[error("Tool not found: {0}")]
ToolNotFound(String),
#[error("Tool execution error: {0}")]
ExecutionError(Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Debug, Clone)]
pub struct AgentResult {
pub task_id: String,
pub success: bool,
pub exit_code: Option<i32>,
pub output: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub enum AgentEvent {
Started { task_id: String },
Output { task_id: String, line: String },
Completed { result: AgentResult },
SpawnFailed { task_id: String, error: String },
}
#[derive(Debug, Clone)]
pub struct SpawnConfig {
pub task_id: String,
pub prompt: String,
pub working_dir: PathBuf,
pub harness: Harness,
pub model: Option<String>,
}
pub async fn spawn_agent(
config: SpawnConfig,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<tokio::task::JoinHandle<AgentResult>, anyhow::Error> {
let binary_path = find_harness_binary(config.harness)?;
let task_id = config.task_id.clone();
let mut cmd = match config.harness {
Harness::Claude => {
let mut c = Command::new(binary_path);
c.arg(&config.prompt);
c.arg("--dangerously-skip-permissions");
if let Some(ref model) = config.model {
c.arg("--model").arg(model);
}
c
}
Harness::OpenCode => {
let mut c = Command::new(binary_path);
c.arg("run");
c.arg("--variant").arg("minimal");
if let Some(ref model) = config.model {
c.arg("--model").arg(model);
}
c.arg(&config.prompt);
c
}
Harness::Cursor => {
let mut c = Command::new(binary_path);
c.arg("-p");
if let Some(ref model) = config.model {
c.arg("--model").arg(model);
}
c.arg(&config.prompt);
c
}
Harness::Rho => {
let mut c = Command::new(binary_path);
c.arg("-p").arg(&config.prompt);
c.arg("-C").arg(&config.working_dir);
if let Some(ref model) = config.model {
c.arg("--model").arg(model);
}
c
}
#[cfg(feature = "direct-api")]
Harness::DirectApi => {
let mut c = Command::new(binary_path);
c.arg("agent-exec");
c.arg("--prompt").arg(&config.prompt);
if let Some(ref model) = config.model {
c.arg("--model").arg(model);
}
c
}
};
cmd.current_dir(&config.working_dir);
cmd.env("SCUD_TASK_ID", &config.task_id);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let start_time = std::time::Instant::now();
let mut child = cmd.spawn().map_err(|e| {
anyhow::anyhow!(
"Failed to spawn {} for task {}: {}",
config.harness.name(),
config.task_id,
e
)
})?;
let _ = event_tx
.send(AgentEvent::Started {
task_id: task_id.clone(),
})
.await;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let event_tx_clone = event_tx.clone();
let task_id_clone = task_id.clone();
let handle = tokio::spawn(async move {
let mut output_buffer = String::new();
if let Some(stdout) = stdout {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
output_buffer.push_str(&line);
output_buffer.push('\n');
let _ = event_tx_clone
.send(AgentEvent::Output {
task_id: task_id_clone.clone(),
line: line.clone(),
})
.await;
}
}
if let Some(stderr) = stderr {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
output_buffer.push_str("[stderr] ");
output_buffer.push_str(&line);
output_buffer.push('\n');
}
}
let status = child.wait().await;
let duration_ms = start_time.elapsed().as_millis() as u64;
let (success, exit_code) = match status {
Ok(s) => (s.success(), s.code()),
Err(_) => (false, None),
};
let result = AgentResult {
task_id: task_id_clone.clone(),
success,
exit_code,
output: output_buffer,
duration_ms,
};
let _ = event_tx_clone
.send(AgentEvent::Completed {
result: result.clone(),
})
.await;
result
});
Ok(handle)
}
pub fn load_agent_config(
agent_type: Option<&str>,
default_harness: Harness,
default_model: Option<&str>,
working_dir: &Path,
) -> (Harness, Option<String>) {
if let Some(agent_name) = agent_type {
if let Some(agent_def) = AgentDef::try_load(agent_name, working_dir) {
let harness = agent_def.harness().unwrap_or(default_harness);
let model = agent_def
.model()
.map(String::from)
.or_else(|| default_model.map(String::from));
return (harness, model);
}
}
(default_harness, default_model.map(String::from))
}
pub struct AgentRunner {
event_tx: mpsc::Sender<AgentEvent>,
event_rx: mpsc::Receiver<AgentEvent>,
handles: Vec<tokio::task::JoinHandle<AgentResult>>,
}
impl AgentRunner {
pub fn new(capacity: usize) -> Self {
let (event_tx, event_rx) = mpsc::channel(capacity);
Self {
event_tx,
event_rx,
handles: Vec::new(),
}
}
pub fn event_sender(&self) -> mpsc::Sender<AgentEvent> {
self.event_tx.clone()
}
pub async fn spawn(&mut self, config: SpawnConfig) -> anyhow::Result<()> {
let handle = spawn_agent(config, self.event_tx.clone()).await?;
self.handles.push(handle);
Ok(())
}
pub async fn recv_event(&mut self) -> Option<AgentEvent> {
self.event_rx.recv().await
}
pub fn try_recv_event(&mut self) -> Option<AgentEvent> {
self.event_rx.try_recv().ok()
}
pub async fn wait_all(&mut self) -> Vec<AgentResult> {
let handles = std::mem::take(&mut self.handles);
let mut results = Vec::new();
for handle in handles {
if let Ok(result) = handle.await {
results.push(result);
}
}
results
}
pub fn active_count(&self) -> usize {
self.handles.iter().filter(|h| !h.is_finished()).count()
}
}
pub async fn map_with_concurrency_limit<T, F, Fut, R>(
items: impl IntoIterator<Item = T>,
concurrency: usize,
f: F,
) -> Vec<R>
where
F: Fn(T) -> Fut,
Fut: Future<Output = R>,
{
stream::iter(items)
.map(f)
.buffer_unordered(concurrency)
.collect()
.await
}
pub async fn map_with_concurrency_limit_ordered<T, F, Fut, R>(
items: impl IntoIterator<Item = T>,
concurrency: usize,
f: F,
) -> Vec<R>
where
F: Fn(T) -> Fut,
Fut: Future<Output = R>,
{
stream::iter(items)
.map(f)
.buffered(concurrency)
.collect()
.await
}
pub async fn spawn_agents_with_limit(
configs: impl IntoIterator<Item = SpawnConfig>,
concurrency: usize,
event_tx: mpsc::Sender<AgentEvent>,
) -> Vec<Result<AgentResult, anyhow::Error>> {
let configs: Vec<_> = configs.into_iter().collect();
map_with_concurrency_limit(configs, concurrency, |config| {
let tx = event_tx.clone();
async move {
match spawn_agent(config, tx).await {
Ok(handle) => handle
.await
.map_err(|e| anyhow::anyhow!("Join error: {}", e)),
Err(e) => Err(e),
}
}
})
.await
}
#[derive(Debug, Clone)]
pub struct ConcurrentSpawnConfig {
pub max_concurrent: usize,
pub timeout_ms: u64,
pub fail_fast: bool,
}
impl Default for ConcurrentSpawnConfig {
fn default() -> Self {
Self {
max_concurrent: 5,
timeout_ms: 0,
fail_fast: false,
}
}
}
#[derive(Debug)]
pub struct ConcurrentSpawnResult {
pub successes: Vec<AgentResult>,
pub failures: Vec<(String, String)>,
pub all_succeeded: bool,
}
pub async fn spawn_agents_concurrent(
configs: Vec<SpawnConfig>,
spawn_config: ConcurrentSpawnConfig,
event_tx: mpsc::Sender<AgentEvent>,
) -> ConcurrentSpawnResult {
let mut successes = Vec::new();
let mut failures = Vec::new();
let results = if spawn_config.timeout_ms > 0 {
let timeout_duration = std::time::Duration::from_millis(spawn_config.timeout_ms);
map_with_concurrency_limit(configs, spawn_config.max_concurrent, |config| {
let tx = event_tx.clone();
let task_id = config.task_id.clone();
async move {
let result = tokio::time::timeout(timeout_duration, async {
match spawn_agent(config, tx).await {
Ok(handle) => handle
.await
.map_err(|e| anyhow::anyhow!("Join error: {}", e)),
Err(e) => Err(e),
}
})
.await;
match result {
Ok(Ok(agent_result)) => Ok(agent_result),
Ok(Err(e)) => Err((task_id, e.to_string())),
Err(_) => Err((task_id, "Timeout".to_string())),
}
}
})
.await
} else {
map_with_concurrency_limit(configs, spawn_config.max_concurrent, |config| {
let tx = event_tx.clone();
let task_id = config.task_id.clone();
async move {
match spawn_agent(config, tx).await {
Ok(handle) => handle
.await
.map_err(|e| (task_id, format!("Join error: {}", e))),
Err(e) => Err((task_id, e.to_string())),
}
}
})
.await
};
for result in results {
match result {
Ok(agent_result) => successes.push(agent_result),
Err((task_id, error)) => failures.push((task_id, error)),
}
}
let all_succeeded = failures.is_empty();
ConcurrentSpawnResult {
successes,
failures,
all_succeeded,
}
}
pub async fn spawn_subagent(
task_id: String,
prompt: String,
working_dir: PathBuf,
harness: Harness,
model: Option<String>,
) -> Result<AgentResult, anyhow::Error> {
let (tx, _rx) = mpsc::channel(10);
let config = SpawnConfig {
task_id,
prompt,
working_dir,
harness,
model,
};
let handle = spawn_agent(config, tx).await?;
handle
.await
.map_err(|e| anyhow::anyhow!("Subagent join error: {}", e))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extension_runner_new() {
let runner = ExtensionRunner::new();
assert!(runner.list_tools().is_empty());
}
#[test]
fn test_agent_result_debug() {
let result = AgentResult {
task_id: "test:1".to_string(),
success: true,
exit_code: Some(0),
output: "test output".to_string(),
duration_ms: 1000,
};
assert!(result.success);
assert_eq!(result.exit_code, Some(0));
assert_eq!(result.task_id, "test:1");
}
#[test]
fn test_spawn_config_debug() {
let config = SpawnConfig {
task_id: "test:1".to_string(),
prompt: "do something".to_string(),
working_dir: PathBuf::from("/tmp"),
harness: Harness::Claude,
model: Some("opus".to_string()),
};
assert_eq!(config.task_id, "test:1");
assert_eq!(config.harness, Harness::Claude);
}
#[tokio::test]
async fn test_agent_runner_new() {
let runner = AgentRunner::new(100);
assert_eq!(runner.active_count(), 0);
}
#[test]
fn test_tool_call_result() {
let result = ToolCallResult {
tool_name: "my_tool".to_string(),
output: serde_json::json!({"key": "value"}),
success: true,
};
assert_eq!(result.tool_name, "my_tool");
assert!(result.success);
assert_eq!(result.output["key"], "value");
}
#[test]
fn test_on_tool_call_not_found() {
let runner = ExtensionRunner::new();
let result = runner.on_tool_call("nonexistent", serde_json::json!({}));
assert!(result.is_err());
match result {
Err(ExtensionRunnerError::ToolNotFound(name)) => {
assert_eq!(name, "nonexistent");
}
_ => panic!("Expected ToolNotFound error"),
}
}
#[test]
fn test_on_tool_call_with_registered_tool() {
let mut runner = ExtensionRunner::new();
fn echo_tool(args: &[Value]) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
Ok(args.first().cloned().unwrap_or(Value::Null))
}
runner.register_tool("echo".to_string(), echo_tool);
let result = runner
.on_tool_call("echo", serde_json::json!({"test": 123}))
.unwrap();
assert_eq!(result.tool_name, "echo");
assert!(result.success);
assert_eq!(result.output["test"], 123);
}
#[test]
fn test_on_tool_call_argument_conversion() {
let mut runner = ExtensionRunner::new();
fn count_args(args: &[Value]) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
Ok(serde_json::json!(args.len()))
}
runner.register_tool("count".to_string(), count_args);
let result = runner
.on_tool_call("count", serde_json::json!([1, 2, 3]))
.unwrap();
assert_eq!(result.output, 3);
let result = runner
.on_tool_call("count", serde_json::json!({"a": 1}))
.unwrap();
assert_eq!(result.output, 1);
let result = runner.on_tool_call("count", Value::Null).unwrap();
assert_eq!(result.output, 0);
let result = runner.on_tool_call("count", serde_json::json!(42)).unwrap();
assert_eq!(result.output, 1);
}
#[tokio::test]
async fn test_map_with_concurrency_limit() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let items: Vec<i32> = (0..10).collect();
let counter = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let results = map_with_concurrency_limit(items, 3, |n| {
let counter = Arc::clone(&counter);
let max_concurrent = Arc::clone(&max_concurrent);
async move {
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = max_concurrent.load(Ordering::SeqCst);
while current > max {
match max_concurrent.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(new_max) => max = new_max,
}
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
counter.fetch_sub(1, Ordering::SeqCst);
n * 2
}
})
.await;
assert_eq!(results.len(), 10);
let mut sorted: Vec<i32> = results;
sorted.sort();
assert_eq!(sorted, vec![0, 2, 4, 6, 8, 10, 12, 14, 16, 18]);
assert!(max_concurrent.load(Ordering::SeqCst) <= 3);
}
#[tokio::test]
async fn test_map_with_concurrency_limit_ordered() {
let items: Vec<i32> = vec![1, 2, 3, 4, 5];
let results = map_with_concurrency_limit_ordered(items, 2, |n| async move {
tokio::time::sleep(std::time::Duration::from_millis((5 - n) as u64 * 5)).await;
n * 10
})
.await;
assert_eq!(results, vec![10, 20, 30, 40, 50]);
}
#[test]
fn test_concurrent_spawn_config_default() {
let config = ConcurrentSpawnConfig::default();
assert_eq!(config.max_concurrent, 5);
assert_eq!(config.timeout_ms, 0);
assert!(!config.fail_fast);
}
#[test]
fn test_concurrent_spawn_result() {
let result = ConcurrentSpawnResult {
successes: vec![AgentResult {
task_id: "1".to_string(),
success: true,
exit_code: Some(0),
output: "done".to_string(),
duration_ms: 100,
}],
failures: vec![],
all_succeeded: true,
};
assert!(result.all_succeeded);
assert_eq!(result.successes.len(), 1);
assert!(result.failures.is_empty());
}
#[test]
fn test_concurrent_spawn_result_with_failures() {
let result = ConcurrentSpawnResult {
successes: vec![],
failures: vec![("task1".to_string(), "error msg".to_string())],
all_succeeded: false,
};
assert!(!result.all_succeeded);
assert!(result.successes.is_empty());
assert_eq!(result.failures.len(), 1);
assert_eq!(result.failures[0].0, "task1");
}
}