use serde_json::Value;
use std::io::Write;
use std::process::{Command, Stdio};
use std::time::Duration;
const DEFAULT_TIMEOUT_SECS: u64 = 10;
const MAX_TIMEOUT_SECS: u64 = 60;
const MAX_OUTPUT_BYTES: usize = 16_384;
pub async fn execute(args: &Value) -> Result<String, String> {
let language = args
.get("language")
.and_then(|v| v.as_str())
.unwrap_or("javascript")
.to_lowercase();
let code = args
.get("code")
.and_then(|v| v.as_str())
.ok_or("Missing required argument: 'code'")?;
let timeout_secs = args
.get("timeout_seconds")
.and_then(|v| v.as_u64())
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS);
match language.as_str() {
"javascript" | "typescript" | "js" | "ts" => run_deno(code, timeout_secs),
"python" | "python3" | "py" => run_python(code, timeout_secs),
other => Err(format!(
"Unsupported language: '{}'. Supported: javascript, typescript, python.",
other
)),
}
}
fn run_deno(code: &str, timeout_secs: u64) -> Result<String, String> {
let deno = find_deno().ok_or_else(|| {
"Deno not found. Hematite checks (in order): settings.json `deno_path`, \
LM Studio's bundled copy (~/.lmstudio/.internal/utils/deno.exe), system PATH. \
To install Deno globally: `winget install DenoLand.Deno` (Windows) or see https://deno.com. \
Or set `deno_path` in .hematite/settings.json to point to any Deno binary."
.to_string()
})?;
let mut child = Command::new(&deno)
.args([
"run",
"--allow-read=.", "--allow-write=.", "--deny-net", "--deny-sys", "--deny-env", "--deny-run", "--deny-ffi", "--no-prompt", "-", ])
.env("NO_COLOR", "true") .stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| format!("Failed to spawn Deno: {e}"))?;
write_stdin(&mut child, code)?;
collect_output(child, "deno", timeout_secs)
}
fn run_python(code: &str, timeout_secs: u64) -> Result<String, String> {
let python = find_python().ok_or_else(|| {
"Python not found. Hematite checks (in order): settings.json `python_path`, \
system PATH (python3, python), and 'py' launcher on Windows. \
To install Python: https://python.org or `winget install Python.Python.3`. \
Or set `python_path` in .hematite/settings.json to point to any Python binary."
.to_string()
})?;
let mut cmd = Command::new(&python);
if python == "py" {
cmd.arg("-3");
}
let mut builder = cmd.args([
"-c",
&wrap_python(code),
]);
builder = builder
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env_clear()
.env("PYTHONDONTWRITEBYTECODE", "1")
.env("PYTHONIOENCODING", "utf-8");
#[cfg(target_os = "windows")]
{
if let Ok(v) = std::env::var("SYSTEMROOT") {
builder = builder.env("SYSTEMROOT", v);
}
if let Ok(v) = std::env::var("TEMP") {
builder = builder.env("TEMP", v);
}
if let Ok(v) = std::env::var("TMP") {
builder = builder.env("TMP", v);
}
}
#[cfg(not(target_os = "windows"))]
{
if let Ok(v) = std::env::var("TMPDIR") {
builder = builder.env("TMPDIR", v);
}
}
let child = builder
.spawn()
.map_err(|e| format!("Failed to spawn Python: {e}"))?;
collect_output(child, "python", timeout_secs)
}
fn wrap_python(code: &str) -> String {
format!(
r#"
import sys
# Block network access
import socket as _socket
_socket.socket = None # type: ignore
# Block subprocess and os.system
import os as _os
_os.system = lambda *a, **k: (_ for _ in ()).throw(PermissionError("os.system blocked in sandbox"))
_popen_orig = _os.popen
_os.popen = lambda *a, **k: (_ for _ in ()).throw(PermissionError("os.popen blocked in sandbox"))
# Block __import__ for subprocess
_real_import = __builtins__.__import__ if hasattr(__builtins__, '__import__') else __import__
def _safe_import(name, *args, **kwargs):
if name in ('subprocess', 'multiprocessing', 'pty', 'telnetlib', 'ftplib', 'smtplib', 'http', 'urllib', 'requests', 'httpx'):
raise ImportError(f"Module '{{name}}' is blocked in the sandbox.")
return _real_import(name, *args, **kwargs)
import builtins
builtins.__import__ = _safe_import
# ── Math / data science prelude — always available, no import needed ──────────
import math
import statistics
import datetime
import json
import re
import decimal
import fractions
import itertools
from collections import Counter, defaultdict, OrderedDict
from functools import reduce
# Optional: numpy / pandas — available if installed on the host
try:
import numpy as np
import pandas as pd
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# Run the actual code
try:
exec(compile(r"""{code}""", "<sandbox>", "exec"))
except Exception as e:
print(f"\nSandbox Execution Error: {{e}}", file=sys.stderr)
sys.exit(1)
"#,
code = code.replace(r#"""""#, r#"\" \" \""#)
)
}
fn write_stdin(child: &mut std::process::Child, code: &str) -> Result<(), String> {
let mut stdin = child
.stdin
.take()
.ok_or("Failed to open stdin for sandbox process")?;
stdin
.write_all(code.as_bytes())
.map_err(|e| format!("Failed to write to sandbox stdin: {e}"))?;
drop(stdin); Ok(())
}
fn collect_output(
child: std::process::Child,
runtime: &str,
timeout_secs: u64,
) -> Result<String, String> {
let timeout = Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
let output = std::thread::scope(|s| {
let handle = s.spawn(|| child.wait_with_output());
loop {
if start.elapsed() >= timeout {
return Err(format!(
"Sandbox timeout: process exceeded {}s and was killed.",
timeout_secs
));
}
std::thread::sleep(Duration::from_millis(50));
if handle.is_finished() {
return handle
.join()
.map_err(|_| "Sandbox thread panicked.".to_string())?
.map_err(|e| format!("Failed to collect {runtime} output: {e}"));
}
}
})?;
let stdout = truncate(
&String::from_utf8_lossy(&output.stdout),
MAX_OUTPUT_BYTES / 2,
);
let stderr = truncate(
&String::from_utf8_lossy(&output.stderr),
MAX_OUTPUT_BYTES / 2,
);
if output.status.success() {
if stdout.trim().is_empty() && stderr.trim().is_empty() {
Ok("(no output)".to_string())
} else if stderr.trim().is_empty() {
Ok(stdout)
} else {
Ok(format!("{stdout}\n[stderr]\n{stderr}"))
}
} else {
let code = output
.status
.code()
.map(|c| c.to_string())
.unwrap_or_else(|| "?".to_string());
Err(format!(
"Exit code {code}\n{}\n{}",
stdout.trim(),
stderr.trim()
))
}
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}\n... [truncated]", &s[..max])
}
}
fn find_deno() -> Option<String> {
let config = crate::agent::config::load_config();
if let Some(path) = config.deno_path {
if std::path::Path::new(&path).exists() {
return Some(path);
}
}
let exe = if cfg!(windows) { "deno.exe" } else { "deno" };
if let Ok(home) = std::env::var("USERPROFILE").or_else(|_| std::env::var("HOME")) {
let standard = std::path::Path::new(&home)
.join(".deno")
.join("bin")
.join(exe);
if standard.exists() {
return Some(standard.to_string_lossy().into_owned());
}
}
if cfg!(windows) {
if let Ok(local_app) = std::env::var("LOCALAPPDATA") {
let winget_base = std::path::Path::new(&local_app)
.join("Microsoft")
.join("WinGet")
.join("Packages");
if let Ok(entries) = std::fs::read_dir(&winget_base) {
for entry in entries.flatten() {
let name = entry.file_name();
if name.to_string_lossy().starts_with("DenoLand.Deno") {
let candidate = entry.path().join("deno.exe");
if candidate.exists() {
return Some(candidate.to_string_lossy().into_owned());
}
}
}
}
}
}
let check = if cfg!(windows) {
Command::new("where").arg("deno").output()
} else {
Command::new("which").arg("deno").output()
};
if let Ok(out) = check {
if out.status.success() {
let resolved = String::from_utf8_lossy(&out.stdout)
.trim()
.lines()
.next()
.unwrap_or("deno")
.to_string();
return Some(resolved);
}
}
find_lmstudio_deno()
}
fn find_python() -> Option<String> {
let config = crate::agent::config::load_config();
if let Some(path) = config.python_path {
if std::path::Path::new(&path).exists() {
return Some(path);
}
}
find_executable(&["python3", "python", "py"])
}
fn find_executable(candidates: &[&str]) -> Option<String> {
for name in candidates {
let check = if cfg!(windows) {
Command::new("where").arg(name).output()
} else {
Command::new("which").arg(name).output()
};
if check.map(|o| o.status.success()).unwrap_or(false) {
return Some(name.to_string());
}
}
None
}
fn find_lmstudio_deno() -> Option<String> {
let home = std::env::var("USERPROFILE")
.or_else(|_| std::env::var("HOME"))
.ok()?;
let exe = if cfg!(windows) { "deno.exe" } else { "deno" };
let path = std::path::Path::new(&home)
.join(".lmstudio")
.join(".internal")
.join("utils")
.join(exe);
if path.exists() {
Some(path.to_string_lossy().into_owned())
} else {
None
}
}