use std::sync::{Arc, OnceLock};
use kavach::{Backend, ExecResult, Sandbox, SandboxConfig, SandboxPolicy, SandboxState};
use crate::dispatch::{Dispatcher, ToolHandler};
use crate::error::BoteError;
use crate::events::EventSink;
use crate::registry::ToolDef;
use crate::stream::{StreamContext, StreamingToolHandler};
static FALLBACK_RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
fn runtime_handle() -> tokio::runtime::Handle {
match tokio::runtime::Handle::try_current() {
Ok(h) => h,
Err(_) => FALLBACK_RT
.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("BUG: failed to create fallback tokio runtime")
})
.handle()
.clone(),
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ToolSandboxConfig {
pub backend: Backend,
pub policy: SandboxPolicy,
pub timeout_ms: u64,
pub env: Vec<(String, String)>,
}
impl ToolSandboxConfig {
#[must_use]
pub fn new(backend: Backend, policy: SandboxPolicy, timeout_ms: u64) -> Self {
Self {
backend,
policy,
timeout_ms,
env: Vec::new(),
}
}
#[must_use]
pub fn basic() -> Self {
Self::new(Backend::Process, SandboxPolicy::basic(), 30_000)
}
#[must_use]
pub fn strict() -> Self {
Self::new(Backend::Process, SandboxPolicy::strict(), 10_000)
}
#[must_use]
pub fn noop() -> Self {
Self::new(Backend::Noop, SandboxPolicy::minimal(), 30_000)
}
#[must_use]
pub fn with_env(mut self, env: Vec<(String, String)>) -> Self {
self.env = env;
self
}
#[must_use]
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
}
pub struct SandboxExecutor {
default_config: ToolSandboxConfig,
events: Option<Arc<dyn EventSink>>,
}
impl SandboxExecutor {
#[must_use]
pub fn new(default_config: ToolSandboxConfig) -> Self {
Self {
default_config,
events: None,
}
}
#[must_use]
pub fn with_events(mut self, events: Arc<dyn EventSink>) -> Self {
self.events = Some(events);
self
}
pub fn execute(
&self,
tool_name: &str,
command: &str,
args: &serde_json::Value,
config: Option<&ToolSandboxConfig>,
) -> crate::Result<serde_json::Value> {
let cfg = config.unwrap_or(&self.default_config);
let handle = runtime_handle();
handle.block_on(self.execute_async(tool_name, command, args, cfg))
}
async fn execute_async(
&self,
tool_name: &str,
command: &str,
args: &serde_json::Value,
config: &ToolSandboxConfig,
) -> crate::Result<serde_json::Value> {
let sandbox_config = SandboxConfig::builder()
.backend(config.backend)
.policy(config.policy.clone())
.timeout_ms(config.timeout_ms)
.build();
let mut sandbox =
Sandbox::create(sandbox_config)
.await
.map_err(|e| BoteError::SandboxError {
tool: tool_name.into(),
reason: format!("failed to create sandbox: {e}"),
})?;
self.publish_event(
crate::events::TOPIC_SANDBOX_CREATED,
serde_json::json!({"tool_name": tool_name, "backend": format!("{:?}", config.backend)}),
);
sandbox
.transition(SandboxState::Running)
.map_err(|e| BoteError::SandboxError {
tool: tool_name.into(),
reason: format!("failed to start sandbox: {e}"),
})?;
let json_args = serde_json::to_string(args).unwrap_or_default();
let full_command = format!("echo '{}' | {}", json_args.replace('\'', "'\\''"), command);
let result: ExecResult =
sandbox
.exec(&full_command)
.await
.map_err(|e| BoteError::SandboxError {
tool: tool_name.into(),
reason: format!("sandbox exec failed: {e}"),
})?;
sandbox.destroy().await.map_err(|e| {
tracing::warn!(tool = tool_name, error = %e, "sandbox destroy failed");
BoteError::SandboxError {
tool: tool_name.into(),
reason: format!("sandbox destroy failed: {e}"),
}
})?;
self.publish_event(
crate::events::TOPIC_SANDBOX_DESTROYED,
serde_json::json!({
"tool_name": tool_name,
"exit_code": result.exit_code,
"duration_ms": result.duration_ms,
"timed_out": result.timed_out,
}),
);
if result.timed_out {
return Err(BoteError::SandboxError {
tool: tool_name.into(),
reason: "execution timed out".into(),
});
}
if result.exit_code != 0 {
let msg = if result.stderr.is_empty() {
format!("exit code {}", result.exit_code)
} else {
result.stderr.trim().to_string()
};
return Err(BoteError::SandboxError {
tool: tool_name.into(),
reason: msg,
});
}
let stdout = result.stdout.trim();
match serde_json::from_str(stdout) {
Ok(v) => Ok(v),
Err(_) => Ok(serde_json::json!({
"content": [{"type": "text", "text": stdout}]
})),
}
}
fn publish_event(&self, topic: &str, payload: serde_json::Value) {
if let Some(events) = &self.events {
events.publish(topic, payload);
}
}
}
#[must_use]
pub fn wrap_command(command: impl Into<String>, config: ToolSandboxConfig) -> ToolHandler {
let command = command.into();
let executor = Arc::new(SandboxExecutor::new(config));
Arc::new(move |args: serde_json::Value| -> serde_json::Value {
match executor.execute("", &command, &args, None) {
Ok(result) => result,
Err(e) => {
tracing::error!(error = %e, "sandboxed tool execution failed");
serde_json::json!({
"content": [{"type": "text", "text": e.to_string()}],
"isError": true
})
}
}
})
}
#[must_use]
pub fn wrap_streaming_command(
command: impl Into<String>,
config: ToolSandboxConfig,
) -> StreamingToolHandler {
let command = command.into();
let executor = Arc::new(SandboxExecutor::new(config));
Arc::new(
move |args: serde_json::Value, ctx: StreamContext| -> serde_json::Value {
ctx.progress.report_msg(1, 3, "creating sandbox");
if ctx.cancellation.is_cancelled() {
return serde_json::json!({"content": [{"type": "text", "text": "cancelled"}], "isError": true});
}
ctx.progress.report_msg(2, 3, "executing command");
let result = executor.execute("", &command, &args, None);
ctx.progress.report_msg(3, 3, "sandbox complete");
match result {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "sandboxed streaming tool execution failed");
serde_json::json!({
"content": [{"type": "text", "text": e.to_string()}],
"isError": true
})
}
}
},
)
}
impl Dispatcher {
pub fn register_sandboxed_tool(
&self,
tool: ToolDef,
command: impl Into<String>,
config: ToolSandboxConfig,
) -> crate::Result<()> {
let handler = wrap_command(command, config);
self.register_tool(tool, handler)
}
pub fn register_sandboxed_streaming_tool(
&self,
tool: ToolDef,
command: impl Into<String>,
config: ToolSandboxConfig,
) -> crate::Result<()> {
let handler = wrap_streaming_command(command, config);
self.register_streaming_tool(tool, handler)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::registry::{ToolRegistry, ToolSchema};
use std::collections::HashMap;
fn noop_tool(name: &str) -> ToolDef {
ToolDef {
name: name.into(),
description: format!("{name} sandboxed tool"),
input_schema: ToolSchema {
schema_type: "object".into(),
properties: HashMap::new(),
required: vec![],
},
version: None,
deprecated: None,
annotations: None,
}
}
#[test]
fn config_noop_preset() {
let cfg = ToolSandboxConfig::noop();
assert_eq!(cfg.backend, Backend::Noop);
assert_eq!(cfg.timeout_ms, 30_000);
}
#[test]
fn config_basic_preset() {
let cfg = ToolSandboxConfig::basic();
assert_eq!(cfg.backend, Backend::Process);
assert_eq!(cfg.timeout_ms, 30_000);
}
#[test]
fn config_strict_preset() {
let cfg = ToolSandboxConfig::strict();
assert_eq!(cfg.backend, Backend::Process);
assert_eq!(cfg.timeout_ms, 10_000);
}
#[test]
fn config_with_env() {
let cfg = ToolSandboxConfig::noop().with_env(vec![("KEY".into(), "val".into())]);
assert_eq!(cfg.env.len(), 1);
assert_eq!(cfg.env[0].0, "KEY");
}
#[test]
fn config_with_timeout() {
let cfg = ToolSandboxConfig::noop().with_timeout(5000);
assert_eq!(cfg.timeout_ms, 5000);
}
#[test]
fn executor_creation() {
let executor = SandboxExecutor::new(ToolSandboxConfig::noop());
assert!(executor.events.is_none());
}
#[test]
fn executor_with_events() {
let executor = SandboxExecutor::new(ToolSandboxConfig::noop()).with_events(Arc::new(()));
assert!(executor.events.is_some());
}
#[test]
fn executor_execute_noop() {
let executor = SandboxExecutor::new(ToolSandboxConfig::noop());
let result = executor.execute(
"test_tool",
"echo '{\"ok\": true}'",
&serde_json::json!({}),
None,
);
assert!(result.is_ok());
}
#[test]
fn wrap_command_produces_handler() {
let handler = wrap_command("echo test", ToolSandboxConfig::noop());
let result = handler(serde_json::json!({}));
assert!(result.is_object());
}
#[test]
fn wrap_streaming_command_produces_handler() {
let handler = wrap_streaming_command("echo test", ToolSandboxConfig::noop());
let (ctx, rx) = crate::stream::make_stream_context();
let result = handler(serde_json::json!({}), ctx);
let mut updates = vec![];
while let Ok(u) = rx.try_recv() {
updates.push(u);
}
assert_eq!(updates.len(), 3);
assert!(result.is_object());
}
#[test]
fn register_sandboxed_tool_dispatches() {
let reg = ToolRegistry::new();
let d = Dispatcher::new(reg);
d.register_sandboxed_tool(
noop_tool("sandbox_echo"),
"echo test",
ToolSandboxConfig::noop(),
)
.unwrap();
let req = crate::protocol::JsonRpcRequest::new(1, "tools/call")
.with_params(serde_json::json!({"name": "sandbox_echo", "arguments": {}}));
let resp = d.dispatch(&req).unwrap();
assert!(resp.result.is_some());
assert!(resp.error.is_none());
}
#[test]
fn register_sandboxed_streaming_tool_dispatches() {
let reg = ToolRegistry::new();
let d = Dispatcher::new(reg);
d.register_sandboxed_streaming_tool(
noop_tool("sandbox_stream"),
"echo test",
ToolSandboxConfig::noop(),
)
.unwrap();
assert!(d.is_streaming_tool("sandbox_stream"));
}
#[test]
fn sandbox_types_are_send_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<ToolSandboxConfig>();
assert_sync::<ToolSandboxConfig>();
assert_send::<SandboxExecutor>();
assert_sync::<SandboxExecutor>();
}
}