use crate::exec::async_command::{AsyncProcessRunner, ProcessOptions, StreamCaptureConfig};
use crate::exec::sdk_ipc::{ToolIpcHandler, ToolResponse};
use crate::mcp::McpToolExecutor;
use crate::utils::async_utils;
use crate::utils::file_utils::{ensure_dir_exists, write_file_with_context};
use anyhow::{Context, Result};
use hashbrown::HashMap;
use serde_json::Value;
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tracing::{debug, info};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Language {
Python3,
JavaScript,
}
impl Language {
pub fn as_str(&self) -> &'static str {
match self {
Self::Python3 => "python3",
Self::JavaScript => "javascript",
}
}
pub fn interpreter(&self) -> &'static str {
match self {
Self::Python3 => "python3",
Self::JavaScript => "node",
}
}
pub fn detect_python_interpreter(workspace_root: &std::path::Path) -> String {
if let Ok(venv_python) = std::env::var("VIRTUAL_ENV") {
let venv_bin = PathBuf::from(venv_python).join("bin").join("python");
if venv_bin.exists() {
debug!("Using venv Python: {:?}", venv_bin);
return venv_bin.to_string_lossy().into_owned();
}
}
let workspace_venv = workspace_root.join(".venv").join("bin").join("python");
if workspace_venv.exists() {
debug!("Using workspace .venv Python: {:?}", workspace_venv);
return workspace_venv.to_string_lossy().into_owned();
}
if let Ok(system_python) = which::which("python3") {
debug!("Using system python3: {:?}", system_python);
return system_python.to_string_lossy().into_owned();
}
if which::which("uv").is_ok() {
debug!("Using uv for Python execution");
return "uv".to_string();
}
debug!("Using system python3");
"python3".to_string()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ExecutionResult {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
pub json_result: Option<Value>,
pub duration_ms: u128,
}
#[derive(Debug, Clone)]
pub struct ExecutionConfig {
pub timeout_secs: u64,
pub max_output_bytes: usize,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
timeout_secs: 30,
max_output_bytes: 10 * 1024 * 1024, }
}
}
pub struct CodeExecutor {
language: Language,
mcp_client: Arc<dyn McpToolExecutor>,
config: ExecutionConfig,
workspace_root: PathBuf,
enable_pii_protection: bool,
}
impl CodeExecutor {
pub fn new(
language: Language,
mcp_client: Arc<dyn McpToolExecutor>,
workspace_root: PathBuf,
) -> Self {
Self {
language,
mcp_client,
config: ExecutionConfig::default(),
workspace_root,
enable_pii_protection: false,
}
}
pub fn with_config(mut self, config: ExecutionConfig) -> Self {
self.config = config;
self
}
pub fn with_pii_protection(mut self, enabled: bool) -> Self {
self.enable_pii_protection = enabled;
self
}
pub async fn execute(&self, code: &str) -> Result<ExecutionResult> {
info!(
language = self.language.as_str(),
timeout_secs = self.config.timeout_secs,
"Executing code snippet"
);
let start = Instant::now();
let ipc_dir = self.workspace_root.join(".vtcode").join("ipc");
ensure_dir_exists(&ipc_dir).await?;
let sdk = self
.generate_sdk()
.await
.context("failed to generate SDK")?;
let complete_code = match self.language {
Language::Python3 => self.prepare_python_code(&sdk, code)?,
Language::JavaScript => self.prepare_javascript_code(&sdk, code)?,
};
let code_temp_dir = self.workspace_root.join(".vtcode").join("code_temp");
ensure_dir_exists(&code_temp_dir).await?;
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros();
let ext = match self.language {
Language::Python3 => "py",
Language::JavaScript => "js",
};
let code_file = code_temp_dir.join(format!("exec_{}.{}", timestamp, ext));
write_file_with_context(&code_file, &complete_code, "temporary code file").await?;
debug!(
language = self.language.as_str(),
code_file = ?code_file,
"Wrote code to temporary file"
);
let mut env = HashMap::new();
env.insert(
OsString::from("VTCODE_IPC_DIR"),
OsString::from(&*ipc_dir.to_string_lossy()),
);
let mut ipc_handler = ToolIpcHandler::new(ipc_dir.clone());
if self.enable_pii_protection {
ipc_handler.enable_pii_protection()?;
}
let mcp_client = self.mcp_client.clone();
let execution_timeout = Duration::from_secs(self.config.timeout_secs);
let ipc_task: JoinHandle<Result<()>> = tokio::spawn(async move {
let ipc_start = Instant::now();
loop {
let Some(remaining_timeout) = execution_timeout.checked_sub(ipc_start.elapsed())
else {
break;
};
let Some(mut request) = ipc_handler.wait_for_request(remaining_timeout).await?
else {
break;
};
debug!(
tool_name = %request.tool_name,
request_id = %request.id,
"Processing tool request from code"
);
if let Err(e) = ipc_handler.process_request_for_pii(&mut request) {
debug!(error = %e, "PII tokenization failed");
let response = ToolResponse {
id: request.id,
success: false,
result: None,
error: Some(format!("PII processing error: {}", e)),
duration_ms: None,
cache_hit: None,
};
ipc_handler.write_response(response).await?;
continue;
}
let result = match mcp_client
.execute_mcp_tool(&request.tool_name, &request.args)
.await
{
Ok(result) => {
debug!(tool_name = %request.tool_name, "Tool executed successfully");
ToolResponse {
id: request.id,
success: true,
result: Some(result),
error: None,
duration_ms: None,
cache_hit: None,
}
}
Err(e) => {
debug!(
tool_name = %request.tool_name,
error = %e,
"Tool execution failed"
);
ToolResponse {
id: request.id,
success: false,
result: None,
error: Some(e.to_string()),
duration_ms: None,
cache_hit: None,
}
}
};
ipc_handler.write_response(result).await?;
}
Ok(())
});
let (program, args) = match self.language {
Language::Python3 => {
let interpreter = Language::detect_python_interpreter(&self.workspace_root);
if interpreter == "uv" {
(
"uv".to_string(),
vec![
"run".to_string(),
"python".to_string(),
code_file.to_string_lossy().into_owned(),
],
)
} else {
(interpreter, vec![code_file.to_string_lossy().into_owned()])
}
}
Language::JavaScript => (
self.language.interpreter().to_string(),
vec![code_file.to_string_lossy().into_owned()],
),
};
let options = ProcessOptions {
program,
args,
env,
current_dir: Some(self.workspace_root.clone()),
timeout: Some(Duration::from_secs(self.config.timeout_secs)),
cancellation_token: None,
stdout: StreamCaptureConfig {
capture: true,
max_bytes: self.config.max_output_bytes,
},
stderr: StreamCaptureConfig {
capture: true,
max_bytes: self.config.max_output_bytes,
},
};
let process_output = AsyncProcessRunner::run(options)
.await
.context("failed to execute code")?;
let duration_ms = start.elapsed().as_millis();
let stdout = String::from_utf8_lossy(&process_output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&process_output.stderr).into_owned();
let json_result = self.extract_json_result(&stdout, self.language)?;
let _ = tokio::fs::remove_file(&code_file).await;
let _ = tokio::fs::remove_dir_all(&ipc_dir).await;
let ipc_result =
async_utils::with_timeout(ipc_task, Duration::from_secs(1), "IPC handler task").await;
if let Err(e) = ipc_result {
debug!(error = %e, "IPC handler did not complete in time");
}
debug!(
exit_code = process_output.exit_status.code().unwrap_or(-1),
duration_ms,
has_json_result = json_result.is_some(),
"Code execution completed"
);
Ok(ExecutionResult {
exit_code: process_output.exit_status.code().unwrap_or(-1),
stdout,
stderr,
json_result,
duration_ms,
})
}
fn prepare_python_code(&self, sdk: &str, user_code: &str) -> Result<String> {
Ok(format!(
"{}\n\n# User code\n{}\n\n# Capture result\nimport json\nif 'result' in dir():\n print('__JSON_RESULT__')\n print(json.dumps(result, default=str))\n print('__END_JSON__')",
sdk, user_code
))
}
fn prepare_javascript_code(&self, sdk: &str, user_code: &str) -> Result<String> {
Ok(format!(
"{}\n\n// User code\n(async () => {{\n{}\n\n// Capture result\nif (typeof result !== 'undefined') {{\n console.log('__JSON_RESULT__');\n console.log(JSON.stringify(result, null, 2));\n console.log('__END_JSON__');\n}}\n}})();\n",
sdk, user_code
))
}
fn extract_json_result(&self, stdout: &str, _language: Language) -> Result<Option<Value>> {
if !stdout.contains("__JSON_RESULT__") {
return Ok(None);
}
let start_marker = "__JSON_RESULT__";
let end_marker = "__END_JSON__";
let start = match stdout.find(start_marker) {
Some(pos) => pos + start_marker.len(),
None => return Ok(None),
};
let end = match stdout[start..].find(end_marker) {
Some(pos) => start + pos,
None => return Ok(None),
};
let json_str = stdout[start..end].trim();
match serde_json::from_str::<Value>(json_str) {
Ok(value) => {
debug!("Extracted JSON result from code output");
Ok(Some(value))
}
Err(e) => {
debug!(error = %e, "Failed to parse JSON result");
Ok(None)
}
}
}
pub async fn generate_sdk(&self) -> Result<String> {
match self.language {
Language::Python3 => self.generate_python_sdk().await,
Language::JavaScript => self.generate_javascript_sdk().await,
}
}
async fn generate_python_sdk(&self) -> Result<String> {
debug!("Generating Python SDK for MCP tools");
let tools = self
.mcp_client
.list_mcp_tools()
.await
.context("failed to list MCP tools")?;
let mut sdk = String::from(
r#"# MCP Tools SDK - Auto-generated
import json
import sys
import os
import time
from typing import Any, Dict, Optional
from uuid import uuid4
class MCPTools:
"""Interface to MCP tools from agent code via file-based IPC."""
IPC_DIR = os.environ.get("VTCODE_IPC_DIR", "/tmp/vtcode_ipc")
def __init__(self):
self._call_count = 0
self._results = []
os.makedirs(self.IPC_DIR, exist_ok=True)
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
"""Call an MCP tool via file-based IPC."""
request_id = str(uuid4())
# Write request
request = {
"id": request_id,
"tool_name": name,
"args": args
}
request_file = os.path.join(self.IPC_DIR, "request.json")
with open(request_file, 'w') as f:
json.dump(request, f)
# Wait for response
response_file = os.path.join(self.IPC_DIR, "response.json")
timeout = 30
start = time.time()
while time.time() - start < timeout:
if os.path.exists(response_file):
with open(response_file, 'r') as f:
response = json.load(f)
if response.get("id") == request_id:
# Clean up response
try:
os.remove(response_file)
except:
pass
if response.get("success"):
return response.get("result")
else:
raise RuntimeError(f"Tool error: {response.get('error', 'unknown error')}")
time.sleep(0.1)
raise TimeoutError(f"Tool '{name}' timed out after {timeout}s")
def log(self, message: str) -> None:
"""Log a message that will be captured."""
print(f"[LOG] {message}")
# Initialize tools interface
mcp = MCPTools()
"#,
);
for tool in tools {
sdk.push_str(&format!(
"\ndef {}(**kwargs):\n \"\"\"{}.\"\"\"\n return mcp._call_tool('{}', kwargs)\n\n",
sanitize_function_name(&tool.name), tool.description, tool.name
));
}
Ok(sdk)
}
async fn generate_javascript_sdk(&self) -> Result<String> {
debug!("Generating JavaScript SDK for MCP tools");
let tools = self
.mcp_client
.list_mcp_tools()
.await
.context("failed to list MCP tools")?;
let mut sdk = String::from(
r#"// MCP Tools SDK - Auto-generated
const fs = require('fs');
const path = require('path');
const { v4: uuid4 } = require('uuid');
class MCPTools {
constructor() {
this.callCount = 0;
this.results = [];
this.ipcDir = process.env.VTCODE_IPC_DIR || '/tmp/vtcode_ipc';
if (!fs.existsSync(this.ipcDir)) {
fs.mkdirSync(this.ipcDir, { recursive: true });
}
}
async callTool(name, args = {}) {
const requestId = uuid4();
const request = {
id: requestId,
tool_name: name,
args: args
};
const requestFile = path.join(this.ipcDir, 'request.json');
fs.writeFileSync(requestFile, JSON.stringify(request, null, 2));
// Wait for response
const responseFile = path.join(this.ipcDir, 'response.json');
const timeout = 30000; // 30s
const start = Date.now();
while (Date.now() - start < timeout) {
try {
if (fs.existsSync(responseFile)) {
const response = JSON.parse(fs.readFileSync(responseFile, 'utf-8'));
if (response.id === requestId) {
// Clean up response
try {
fs.unlinkSync(responseFile);
} catch (e) {}
if (response.success) {
return response.result;
} else {
throw new Error(`Tool error: ${response.error || 'unknown error'}`);
}
}
}
} catch (e) {
if (e.code !== 'ENOENT') throw e;
}
await new Promise(r => setTimeout(r, 100));
}
throw new Error(`Tool '${name}' timed out after ${timeout}ms`);
}
log(message) {
console.log(`[LOG] ${message}`);
}
}
const mcp = new MCPTools();
"#,
);
for tool in tools {
sdk.push_str(&format!(
"async function {}(args = {{}}) {{\n // {}\n return await mcp.callTool('{}', args);\n}}\n\n",
sanitize_function_name(&tool.name), tool.description, tool.name
));
}
Ok(sdk)
}
pub fn workspace_root(&self) -> &PathBuf {
&self.workspace_root
}
pub fn mcp_client(&self) -> &Arc<dyn McpToolExecutor> {
&self.mcp_client
}
}
fn sanitize_function_name(name: &str) -> String {
name.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::{McpClientStatus, McpToolExecutor, McpToolInfo};
use async_trait::async_trait;
use serde_json::{Value, json};
use std::path::PathBuf;
use std::sync::Arc;
struct MockMcpToolExecutor;
#[async_trait]
impl McpToolExecutor for MockMcpToolExecutor {
async fn execute_mcp_tool(&self, _tool_name: &str, _args: &Value) -> Result<Value> {
Ok(json!({}))
}
async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
Ok(vec![McpToolInfo {
name: "read_file".to_string(),
description: "Read a file".to_string(),
provider: "test".to_string(),
input_schema: json!({}),
}])
}
async fn has_mcp_tool(&self, _tool_name: &str) -> Result<bool> {
Ok(true)
}
fn get_status(&self) -> McpClientStatus {
McpClientStatus {
enabled: true,
provider_count: 1,
active_connections: 1,
configured_providers: vec!["test".to_string()],
}
}
}
fn test_executor(language: Language) -> CodeExecutor {
CodeExecutor::new(
language,
Arc::new(MockMcpToolExecutor),
PathBuf::from("/workspace"),
)
}
#[test]
fn sanitize_function_name_handles_special_chars() {
assert_eq!(sanitize_function_name("read_file"), "read_file");
assert_eq!(sanitize_function_name("read-file"), "read_file");
assert_eq!(sanitize_function_name("read.file"), "read_file");
assert_eq!(sanitize_function_name("readFile123"), "readFile123");
}
#[test]
fn language_as_str() {
assert_eq!(Language::Python3.as_str(), "python3");
assert_eq!(Language::JavaScript.as_str(), "javascript");
}
#[test]
fn language_interpreter() {
assert_eq!(Language::Python3.interpreter(), "python3");
assert_eq!(Language::JavaScript.interpreter(), "node");
}
#[tokio::test]
async fn python_sdk_uses_ipc_dir_only() {
let sdk = test_executor(Language::Python3)
.generate_sdk()
.await
.expect("python sdk should generate");
assert!(sdk.contains("VTCODE_IPC_DIR"));
assert!(!sdk.contains("VTCODE_WORKSPACE"));
}
#[tokio::test]
async fn javascript_sdk_uses_ipc_dir_only() {
let sdk = test_executor(Language::JavaScript)
.generate_sdk()
.await
.expect("javascript sdk should generate");
assert!(sdk.contains("VTCODE_IPC_DIR"));
assert!(!sdk.contains("VTCODE_WORKSPACE"));
}
}