use crate::mcp::{McpFunction, McpToolCall, McpToolResult};
use crate::session::background_jobs::{BackgroundJobManager, CompletedJob, JobHandle};
use anyhow::Result;
use futures::future::BoxFuture;
use serde_json::{json, Value};
use std::sync::{Arc, OnceLock};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::watch;
static JOB_MANAGER: OnceLock<Arc<BackgroundJobManager>> = OnceLock::new();
fn get_max_concurrent_jobs() -> usize {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4)
}
pub fn init_job_manager() {
if let Some(session_id) = crate::session::context::current_session_id() {
crate::session::context::init_job_manager_for_session(&session_id);
return;
}
let manager = BackgroundJobManager::new(get_max_concurrent_jobs());
let _ = JOB_MANAGER.set(Arc::new(manager));
}
pub fn get_job_manager() -> Option<Arc<BackgroundJobManager>> {
if let Some(manager) = crate::session::context::get_job_manager_for_session() {
return Some(manager);
}
JOB_MANAGER.get().cloned()
}
pub fn get_all_functions(config: &crate::config::Config) -> Vec<McpFunction> {
config
.agents
.iter()
.map(|agent_config| McpFunction {
name: format!("agent_{}", agent_config.name),
description: format!(
"{}\n\n\
## Async Execution\n\n\
**async: false** (default) — blocks until complete, result returned immediately.\n\
**async: true** — returns immediately, result injected as a user message when done.\n\n\
Use async when: task takes 30+ seconds, or you can continue other work while waiting.\n\
Use sync when: you need the result before your next action.\n\n\
Result format: `[Async agent 'name' completed]` or `[Async agent 'name' failed]`\n\
Max {} concurrent async jobs. Jobs cancelled on session exit.",
agent_config.description,
get_max_concurrent_jobs()
),
parameters: json!({
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Task description in human language for the agent to process"
},
"async": {
"type": "boolean",
"description": "Run asynchronously. Result injected as user message when complete. Use for long-running tasks where you can continue other work. Default: false.",
"default": false
}
},
"required": ["task"]
}),
})
.collect()
}
pub async fn execute_agent_command(
call: &McpToolCall,
config: &crate::config::Config,
_cancellation_token: Option<tokio::sync::watch::Receiver<bool>>,
) -> Result<McpToolResult> {
let agent_name = match call.tool_name.strip_prefix("agent_") {
Some(name) => name,
None => {
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("Invalid agent tool name: {}", call.tool_name),
));
}
};
let task = match call.parameters.get("task").and_then(|v| v.as_str()) {
Some(t) if !t.trim().is_empty() => t,
_ => {
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
"Agent tool requires a non-empty 'task' parameter".to_string(),
));
}
};
let config_agent = config.agents.iter().find(|a| a.name == agent_name).cloned();
let dynamic_agent = crate::mcp::core::dynamic_agents::get_enabled_agent(agent_name);
match (config_agent, dynamic_agent) {
(Some(agent), None) => {
execute_config_agent(call, &agent, task, config).await
}
(None, Some(agent)) => {
execute_dynamic_agent(call, &agent, task, config).await
}
(None, None) => Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("Agent '{agent_name}' not configured or not enabled"),
)),
(Some(_), Some(_)) => {
Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!(
"Agent '{agent_name}' exists in both config and dynamic agents - ambiguous"
),
))
}
}
}
async fn execute_config_agent(
call: &McpToolCall,
agent_config: &crate::config::agents::AgentConfig,
task: &str,
_config: &crate::config::Config,
) -> Result<McpToolResult> {
let run_async = call
.parameters
.get("async")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let session_workdir = crate::mcp::get_thread_working_directory();
let workdir = agent_config.get_resolved_workdir(&session_workdir);
if run_async {
let manager = match get_job_manager() {
Some(m) => m,
None => {
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
"Async job manager not initialised (no active session)".to_string(),
));
}
};
if let Err(active) = manager.try_acquire() {
return Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("Async job limit reached ({active}/{} active). Wait for existing jobs to complete.", get_max_concurrent_jobs()),
));
}
let (cancel_tx, cancel_rx) = watch::channel(false);
let mgr = Arc::clone(&manager);
let command = agent_config.command.clone();
let agent_name_owned = agent_config.name.clone();
let task_owned = task.to_string();
let workdir_owned = workdir.to_path_buf();
let session_id = crate::session::context::current_session_id();
let handle = tokio::spawn(async move {
let run = async move {
let output =
match run_acp_command(&command, &task_owned, &workdir_owned, cancel_rx).await {
Ok(text) => text,
Err(e) => format!("ERROR: {e:#}"),
};
mgr.release(CompletedJob {
agent_name: agent_name_owned,
output,
});
};
if let Some(sid) = session_id {
crate::session::context::with_session_id(sid, run).await;
} else {
run.await;
}
});
manager.register_job(JobHandle {
cancel_tx,
task_handle: handle,
});
return Ok(McpToolResult::success(
call.tool_name.clone(),
call.tool_id.clone(),
"Agent task started asynchronously. The result will be injected into this conversation automatically when ready.".to_string(),
));
}
match run_acp_command(
&agent_config.command,
task,
&workdir,
watch::channel(false).1,
)
.await
{
Ok(output) => Ok(McpToolResult::success(
call.tool_name.clone(),
call.tool_id.clone(),
output,
)),
Err(e) => Ok(McpToolResult::error(
call.tool_name.clone(),
call.tool_id.clone(),
format!("Agent execution failed: {e:#}"),
)),
}
}
async fn execute_dynamic_agent(
call: &McpToolCall,
agent_config: &crate::mcp::core::dynamic_agents::DynamicAgentConfig,
task: &str,
config: &crate::config::Config,
) -> Result<McpToolResult> {
let run_async = call
.parameters
.get("async")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let agent_config_owned = agent_config.clone();
let merged_config = build_agent_config(&agent_config_owned, config);
let tool_name = call.tool_name.clone();
let tool_id = call.tool_id.clone();
let task_owned = task.to_string();
if run_async {
let manager = match get_job_manager() {
Some(m) => m,
None => {
return Ok(McpToolResult::error(
tool_name,
tool_id,
"Async job manager not initialised (no active session)".to_string(),
));
}
};
if let Err(active) = manager.try_acquire() {
return Ok(McpToolResult::error(
tool_name,
tool_id,
format!(
"Async job limit reached ({active}/{} active). Wait for existing jobs to complete.",
get_max_concurrent_jobs()
),
));
}
let (cancel_tx, cancel_rx) = watch::channel(false);
let mgr = Arc::clone(&manager);
let agent_name = agent_config_owned.name.clone();
let session_id = crate::session::context::current_session_id();
let handle = tokio::spawn(async move {
let run = async move {
let output = match run_dynamic_agent_in_process(
&agent_config_owned,
&task_owned,
&merged_config,
cancel_rx,
)
.await
{
Ok(text) => text,
Err(e) => format!("ERROR: {e:#}"),
};
mgr.release(CompletedJob { agent_name, output });
};
if let Some(sid) = session_id {
crate::session::context::with_session_id(sid, run).await;
} else {
run.await;
}
});
manager.register_job(JobHandle {
cancel_tx,
task_handle: handle,
});
return Ok(McpToolResult::success(
tool_name,
tool_id,
"Agent task started asynchronously. The result will be injected into this conversation automatically when ready.".to_string(),
));
}
let (_cancel_tx, cancel_rx) = watch::channel(false);
match run_dynamic_agent_in_process(&agent_config_owned, &task_owned, &merged_config, cancel_rx)
.await
{
Ok(output) => Ok(McpToolResult::success(tool_name, tool_id, output)),
Err(e) => Ok(McpToolResult::error(
tool_name,
tool_id,
format!("Agent execution failed: {e:#}"),
)),
}
}
fn build_agent_config(
agent: &crate::mcp::core::dynamic_agents::DynamicAgentConfig,
base_config: &crate::config::Config,
) -> crate::config::Config {
let mut merged = base_config.clone();
if !agent.server_refs.is_empty() {
let dynamic_servers = crate::mcp::core::dynamic::get_all_configs();
let mut all_servers = base_config.mcp.servers.clone();
for ds in dynamic_servers {
if !all_servers.iter().any(|s| s.name() == ds.name()) {
all_servers.push(ds);
}
}
let role_mcp = crate::config::RoleMcpConfig {
server_refs: agent.server_refs.clone(),
allowed_tools: agent.allowed_tools.clone(),
};
let enabled_servers = role_mcp.get_enabled_servers(&all_servers, None);
crate::log_debug!(
"Dynamic agent '{}' enabling {} servers from server_refs: {:?}",
agent.name,
enabled_servers.len(),
agent.server_refs
);
merged.mcp = crate::config::McpConfig {
servers: enabled_servers,
allowed_tools: agent.allowed_tools.clone(),
};
} else {
merged.mcp.servers.clear();
merged.mcp.allowed_tools.clear();
}
if let Some(ref model) = agent.model {
merged.model = model.clone();
}
merged
}
fn run_dynamic_agent_in_process(
agent: &crate::mcp::core::dynamic_agents::DynamicAgentConfig,
task: &str,
agent_config: &crate::config::Config,
operation_cancelled: watch::Receiver<bool>,
) -> BoxFuture<'static, Result<String>> {
let agent = agent.clone();
let task = task.to_string();
let agent_config = agent_config.clone();
Box::pin(async move {
let agent = &agent;
let task = task.as_str();
let agent_config = &agent_config;
use crate::session::{ChatCompletionWithValidationParams, Message};
if *operation_cancelled.borrow() {
anyhow::bail!("Operation cancelled");
}
let effective_model = agent
.model
.clone()
.unwrap_or_else(|| agent_config.model.clone());
let should_cache = crate::session::model_supports_caching(&effective_model);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let messages = vec![
Message {
role: "system".to_string(),
content: agent.system.clone(),
timestamp: now,
cached: should_cache,
..Default::default()
},
Message {
role: "user".to_string(),
content: task.to_string(),
timestamp: now,
cached: false,
..Default::default()
},
];
let validation_params = ChatCompletionWithValidationParams::new(
&messages,
&effective_model,
agent.temperature.unwrap_or(0.7),
agent.top_p.unwrap_or(0.9),
agent.top_k.unwrap_or(0),
agent_config.get_effective_max_tokens(),
agent_config,
)
.with_max_retries(agent_config.max_retries)
.with_cancellation_token(operation_cancelled.clone());
let response = crate::session::chat_completion_with_validation(validation_params).await?;
if *operation_cancelled.borrow() {
anyhow::bail!("Operation cancelled");
}
let mut current_content = response.content;
let mut current_exchange = response.exchange;
let mut current_tool_calls_param = response.tool_calls;
if !agent.server_refs.is_empty() {
let mut conv_messages = messages.clone();
let layer_cfg = crate::session::layers::layer_trait::LayerConfig {
name: agent.name.clone(),
model: agent.model.clone(),
system_prompt: None,
description: String::new(),
temperature: agent.temperature.unwrap_or(0.7),
top_p: agent.top_p.unwrap_or(0.9),
top_k: agent.top_k.unwrap_or(0),
max_tokens: agent_config.get_effective_max_tokens(),
input_mode: crate::session::layers::layer_trait::InputMode::Last,
output_mode: crate::session::layers::layer_trait::OutputMode::Append,
output_role: crate::session::layers::layer_trait::OutputRole::Assistant,
mcp: crate::session::layers::layer_trait::LayerMcpConfig {
server_refs: agent.server_refs.clone(),
allowed_tools: agent.allowed_tools.clone(),
},
parameters: std::collections::HashMap::new(),
processed_system_prompt: None,
};
loop {
if *operation_cancelled.borrow() {
anyhow::bail!("Operation cancelled");
}
let current_tool_calls = if let Some(calls) = current_tool_calls_param.take() {
if !calls.is_empty() {
calls
} else {
crate::mcp::parse_tool_calls(¤t_content)
}
} else {
crate::mcp::parse_tool_calls(¤t_content)
};
if current_tool_calls.is_empty() {
break;
}
let original_tool_calls =
crate::session::chat::MessageHandler::extract_original_tool_calls(
¤t_exchange,
);
conv_messages.push(Message {
role: "assistant".to_string(),
content: current_content.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
tool_calls: original_tool_calls,
..Default::default()
});
let output_mode = crate::session::output::detect_output_mode(
agent_config
.runtime_output_mode
.as_deref()
.unwrap_or("plain"),
);
let layer_tool_params =
crate::session::chat::response::tool_execution::LayerToolExecutionParams {
tool_calls: current_tool_calls,
session_name: format!("agent_{}", agent.name),
layer_config: layer_cfg.clone(),
layer_name: format!("agent_{}", agent.name),
operation_cancelled: Some(operation_cancelled.clone()),
mode: output_mode,
};
let (tool_results, _tool_time) =
crate::session::chat::response::tool_execution::execute_layer_tool_calls_parallel(
agent_config,
layer_tool_params,
)
.await?;
if *operation_cancelled.borrow() {
anyhow::bail!("Operation cancelled");
}
if tool_results.is_empty() {
break;
}
for tool_result in &tool_results {
let raw_content = if let Some(output) = tool_result.result.get("output") {
if let Some(s) = output.as_str() {
s.to_string()
} else {
serde_json::to_string(output).unwrap_or_default()
}
} else {
serde_json::to_string(&tool_result.result).unwrap_or_default()
};
let (tool_content, _) = crate::utils::truncation::truncate_mcp_response_global(
&raw_content,
agent_config.mcp_response_tokens_threshold,
);
conv_messages.push(Message {
role: "tool".to_string(),
content: tool_content,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
tool_call_id: Some(tool_result.tool_id.clone()),
name: Some(tool_result.tool_name.clone()),
..Default::default()
});
}
let follow_up_params = ChatCompletionWithValidationParams::new(
&conv_messages,
&effective_model,
agent.temperature.unwrap_or(0.7),
agent.top_p.unwrap_or(0.9),
agent.top_k.unwrap_or(0),
agent_config.get_effective_max_tokens(),
agent_config,
)
.with_max_retries(agent_config.max_retries)
.with_cancellation_token(operation_cancelled.clone());
match crate::session::chat_completion_with_validation(follow_up_params).await {
Ok(follow_up) => {
if *operation_cancelled.borrow() {
anyhow::bail!("Operation cancelled");
}
let has_tool_calls = if let Some(ref calls) = follow_up.tool_calls {
!calls.is_empty()
} else {
!crate::mcp::parse_tool_calls(&follow_up.content).is_empty()
};
let should_continue = crate::session::chat::response::tool_result_processor::check_should_continue(
&follow_up,
agent_config,
has_tool_calls,
);
current_content = follow_up.content;
current_exchange = follow_up.exchange;
current_tool_calls_param = follow_up.tool_calls;
if !should_continue {
break;
}
}
Err(e) => {
crate::log_error!(
"Dynamic agent '{}' follow-up API call failed: {}",
agent.name,
e
);
return Err(e);
}
}
}
}
Ok(current_content.trim().to_string())
}) }
async fn run_acp_command(
command: &str,
task: &str,
workdir: &std::path::Path,
mut cancel_rx: watch::Receiver<bool>,
) -> Result<String> {
let mut parts = command.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Empty command"))?;
let args: Vec<&str> = parts.collect();
let mut child = Command::new(program)
.args(&args)
.current_dir(workdir)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("No stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let mut lines = BufReader::new(stdout).lines();
let msg_line = |msg: Value| format!("{}\n", msg);
stdin
.write_all(
msg_line(json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "0.1.0",
"clientInfo": {"name": "octomind-agent-tool", "version": "1.0"}
}
}))
.as_bytes(),
)
.await?;
wait_for_response(&mut lines, 1).await?;
let cwd_str = workdir.to_string_lossy();
stdin
.write_all(
msg_line(json!({
"jsonrpc": "2.0",
"id": 2,
"method": "session/new",
"params": {"cwd": cwd_str, "mcpServers": []}
}))
.as_bytes(),
)
.await?;
let session_resp = wait_for_response(&mut lines, 2).await?;
let session_id = session_resp
.get("result")
.and_then(|r| r.get("sessionId"))
.and_then(|s| s.as_str())
.ok_or_else(|| anyhow::anyhow!("No sessionId in session/new response"))?
.to_string();
stdin
.write_all(
msg_line(json!({
"jsonrpc": "2.0",
"id": 3,
"method": "session/prompt",
"params": {
"sessionId": session_id,
"prompt": [{"type": "text", "text": task}]
}
}))
.as_bytes(),
)
.await?;
let mut output = String::new();
loop {
if *cancel_rx.borrow() {
let _ = child.kill().await;
return Err(anyhow::anyhow!("Agent task cancelled"));
}
let line = tokio::select! {
line = lines.next_line() => {
match line? {
Some(l) => l,
None => break,
}
}
_ = cancel_rx.changed() => {
let _ = child.kill().await;
return Err(anyhow::anyhow!("Agent task cancelled"));
}
};
if line.trim().is_empty() {
continue;
}
let msg: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
if msg.get("method").and_then(|m| m.as_str()) == Some("session/update") {
if let Some(update) = msg.pointer("/params/update") {
if update.get("sessionUpdate").and_then(|u| u.as_str())
== Some("agent_message_chunk")
{
if let Some(text) = update.pointer("/content/text").and_then(|t| t.as_str()) {
output.push_str(text);
}
}
}
}
if msg.get("id").and_then(|i| i.as_u64()) == Some(3) {
break;
}
}
drop(stdin);
let _ = child.wait().await;
Ok(output.trim().to_string())
}
async fn wait_for_response(
lines: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
id: u64,
) -> Result<Value> {
loop {
let line = match lines.next_line().await? {
Some(l) => l,
None => return Err(anyhow::anyhow!("Subprocess closed before response id={id}")),
};
if line.trim().is_empty() {
continue;
}
let msg: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
if msg.get("id").and_then(|i| i.as_u64()) == Some(id) {
if let Some(err) = msg.get("error") {
return Err(anyhow::anyhow!("ACP error: {err}"));
}
return Ok(msg);
}
}
}