use anyhow::{Context, Result, bail};
use async_trait::async_trait;
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicySpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
use kube::api::{Api, DeleteParams, PostParams};
use kube::config::{KubeConfigOptions, Kubeconfig};
use kube::{Client, Config as KubeConfig};
use std::collections::{BTreeMap, HashMap};
use tokio::io::AsyncReadExt;
use super::{BackendType, ExecResult, Sandbox, SandboxConfig};
use crate::config::OrchestratorConfig;
pub struct KubernetesSandbox {
name: String,
namespace: String,
pod_name: Option<String>,
running: bool,
client: Option<Client>,
runtime_class: Option<String>,
service_account: Option<String>,
node_selector: HashMap<String, String>,
network_policy_created: bool,
network_disabled: bool,
}
impl KubernetesSandbox {
pub fn new(name: &str, config: &OrchestratorConfig) -> Self {
Self {
name: name.to_string(),
namespace: config.namespace.clone(),
pod_name: None,
running: false,
client: None,
runtime_class: config.runtime_class.clone(),
service_account: config.service_account.clone(),
node_selector: config.node_selector.clone(),
network_policy_created: false,
network_disabled: false,
}
}
async fn build_client(config: &OrchestratorConfig) -> Result<Client> {
if let Ok(config) = KubeConfig::incluster() {
return Client::try_from(config).context("Failed to create in-cluster K8s client");
}
let kubeconfig = if let Some(ref path) = config.kubeconfig {
let expanded = tilde_expand(path);
Kubeconfig::read_from(expanded).context("Failed to read kubeconfig")?
} else {
Kubeconfig::read().context("Failed to read default kubeconfig")?
};
let mut options = KubeConfigOptions::default();
if let Some(ref ctx) = config.context {
options.context = Some(ctx.clone());
}
let kube_config = KubeConfig::from_custom_kubeconfig(kubeconfig, &options)
.await
.context("Failed to build K8s config from kubeconfig")?;
Client::try_from(kube_config).context("Failed to create K8s client")
}
fn pod_name_for(sandbox_name: &str) -> String {
let sanitized: String = sandbox_name
.to_lowercase()
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' {
c
} else {
'-'
}
})
.collect();
format!("agentkernel-{}", sanitized)
}
fn pod_labels(sandbox_name: &str) -> BTreeMap<String, String> {
let mut labels = BTreeMap::new();
labels.insert("agentkernel/sandbox".to_string(), sandbox_name.to_string());
labels.insert(
"agentkernel/managed-by".to_string(),
"agentkernel".to_string(),
);
labels.insert("agentkernel/pool".to_string(), "active".to_string());
labels
}
fn build_pod_spec(&self, config: &SandboxConfig) -> Pod {
let pod_name = Self::pod_name_for(&self.name);
let labels = Self::pod_labels(&self.name);
let mut security_context = k8s_openapi::api::core::v1::SecurityContext {
privileged: Some(false),
allow_privilege_escalation: Some(false),
read_only_root_filesystem: Some(config.read_only),
run_as_non_root: Some(true),
run_as_user: Some(1000),
..Default::default()
};
security_context.capabilities = Some(k8s_openapi::api::core::v1::Capabilities {
drop: Some(vec!["ALL".to_string()]),
..Default::default()
});
let mut resource_limits = BTreeMap::new();
resource_limits.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
"{}Mi",
config.memory_mb
)),
);
resource_limits.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
"{}m",
config.vcpus * 1000
)),
);
let resource_requests = BTreeMap::new();
let resources = k8s_openapi::api::core::v1::ResourceRequirements {
limits: Some(resource_limits),
requests: Some(resource_requests),
..Default::default()
};
let container_ports: Option<Vec<k8s_openapi::api::core::v1::ContainerPort>> =
if config.ports.is_empty() {
None
} else {
Some(
config
.ports
.iter()
.map(|pm| k8s_openapi::api::core::v1::ContainerPort {
container_port: pm.container_port as i32,
protocol: Some(match pm.protocol {
super::PortProtocol::Tcp => "TCP".to_string(),
super::PortProtocol::Udp => "UDP".to_string(),
}),
..Default::default()
})
.collect(),
)
};
let container = Container {
name: "sandbox".to_string(),
image: Some(config.image.clone()),
command: Some(vec![
"sh".to_string(),
"-c".to_string(),
"sleep infinity".to_string(),
]),
security_context: Some(security_context),
resources: Some(resources),
ports: container_ports,
stdin: Some(true),
tty: Some(true),
..Default::default()
};
let node_selector: Option<BTreeMap<String, String>> = if self.node_selector.is_empty() {
None
} else {
Some(
self.node_selector
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
};
let pod_spec = PodSpec {
containers: vec![container],
restart_policy: Some("Never".to_string()),
automount_service_account_token: Some(false),
runtime_class_name: self.runtime_class.clone(),
service_account_name: self.service_account.clone(),
node_selector,
..Default::default()
};
Pod {
metadata: ObjectMeta {
name: Some(pod_name),
namespace: Some(self.namespace.clone()),
labels: Some(labels),
annotations: Some({
let mut ann = BTreeMap::new();
ann.insert(
"pod-security.kubernetes.io/enforce".to_string(),
"restricted".to_string(),
);
ann
}),
..Default::default()
},
spec: Some(pod_spec),
..Default::default()
}
}
async fn create_network_policy(&self, client: &Client) -> Result<()> {
let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
let pod_name = Self::pod_name_for(&self.name);
let np_name = format!("{}-deny-all", pod_name);
let mut match_labels = BTreeMap::new();
match_labels.insert("agentkernel/sandbox".to_string(), self.name.clone());
let np = NetworkPolicy {
metadata: ObjectMeta {
name: Some(np_name),
namespace: Some(self.namespace.clone()),
..Default::default()
},
spec: Some(NetworkPolicySpec {
pod_selector: LabelSelector {
match_labels: Some(match_labels),
..Default::default()
},
ingress: Some(vec![]),
egress: Some(vec![]),
policy_types: Some(vec!["Ingress".to_string(), "Egress".to_string()]),
}),
};
np_api
.create(&PostParams::default(), &np)
.await
.context("Failed to create NetworkPolicy")?;
Ok(())
}
async fn delete_network_policy(&self, client: &Client) -> Result<()> {
let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
let pod_name = Self::pod_name_for(&self.name);
let np_name = format!("{}-deny-all", pod_name);
let _ = np_api.delete(&np_name, &DeleteParams::default()).await;
Ok(())
}
async fn wait_for_running(&self, client: &Client, pod_name: &str) -> Result<()> {
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
let mut delay_ms: u64 = 50;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120);
loop {
let pod = pods.get(pod_name).await?;
if let Some(status) = &pod.status
&& let Some(phase) = &status.phase
{
match phase.as_str() {
"Running" => return Ok(()),
"Failed" | "Succeeded" => {
bail!("Pod entered unexpected phase: {}", phase);
}
_ => {} }
}
if tokio::time::Instant::now() >= deadline {
bail!("Timed out waiting for pod '{}' to start", pod_name);
}
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(500);
}
}
}
#[async_trait]
impl Sandbox for KubernetesSandbox {
async fn start(&mut self, config: &SandboxConfig) -> Result<()> {
let orch_config = OrchestratorConfig {
namespace: self.namespace.clone(),
..Default::default()
};
let client = Self::build_client(&orch_config).await?;
let ns_api: Api<k8s_openapi::api::core::v1::Namespace> = Api::all(client.clone());
if ns_api.get(&self.namespace).await.is_err() {
let _ = ns_api
.create(
&PostParams::default(),
&k8s_openapi::api::core::v1::Namespace {
metadata: ObjectMeta {
name: Some(self.namespace.clone()),
..Default::default()
},
..Default::default()
},
)
.await;
}
let pod = self.build_pod_spec(config);
let pod_name = pod
.metadata
.name
.clone()
.unwrap_or_else(|| Self::pod_name_for(&self.name));
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
pods.create(&PostParams::default(), &pod)
.await
.context("Failed to create K8s pod")?;
self.network_disabled = !config.network;
if !config.network {
self.create_network_policy(&client).await?;
self.network_policy_created = true;
}
self.wait_for_running(&client, &pod_name).await?;
self.pod_name = Some(pod_name);
self.client = Some(client);
self.running = true;
Ok(())
}
async fn exec(&mut self, cmd: &[&str]) -> Result<ExecResult> {
self.exec_with_env(cmd, &[]).await
}
async fn exec_with_env(&mut self, cmd: &[&str], env: &[String]) -> Result<ExecResult> {
if self.client.is_none() {
let orch_config = OrchestratorConfig {
namespace: self.namespace.clone(),
..Default::default()
};
let client = Self::build_client(&orch_config).await?;
self.client = Some(client);
}
if self.pod_name.is_none() {
self.pod_name = Some(Self::pod_name_for(&self.name));
}
let client = self.client.as_ref().unwrap();
let pod_name = self.pod_name.as_ref().unwrap();
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
let full_cmd: Vec<String> = if env.is_empty() {
cmd.iter().map(|s| s.to_string()).collect()
} else {
let mut parts = vec!["env".to_string()];
parts.extend(env.iter().cloned());
parts.extend(cmd.iter().map(|s| s.to_string()));
parts
};
let mut attached = pods
.exec(
pod_name,
full_cmd,
&kube::api::AttachParams::default()
.container("sandbox")
.stdout(true)
.stderr(true),
)
.await
.context("Failed to exec in K8s pod")?;
let mut stdout_reader = attached
.stdout()
.ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let mut stderr_reader = attached
.stderr()
.ok_or_else(|| anyhow::anyhow!("No stderr"))?;
let mut stdout_buf = Vec::new();
let mut stderr_buf = Vec::new();
let (stdout_result, stderr_result) = tokio::join!(
stdout_reader.read_to_end(&mut stdout_buf),
stderr_reader.read_to_end(&mut stderr_buf),
);
stdout_result.context("Failed to read stdout")?;
stderr_result.context("Failed to read stderr")?;
let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
let _ = attached.join().await;
let exit_code = if stderr.is_empty() { 0 } else { 1 };
Ok(ExecResult {
exit_code,
stdout,
stderr,
})
}
async fn stop(&mut self) -> Result<()> {
if self.client.is_none() {
let orch_config = OrchestratorConfig {
namespace: self.namespace.clone(),
..Default::default()
};
if let Ok(client) = Self::build_client(&orch_config).await {
self.client = Some(client);
}
}
if self.pod_name.is_none() {
self.pod_name = Some(Self::pod_name_for(&self.name));
}
if let (Some(client), Some(pod_name)) = (&self.client, &self.pod_name) {
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
let _ = pods
.delete(pod_name, &DeleteParams::default())
.await
.context("Failed to delete K8s pod");
if self.network_policy_created {
let _ = self.delete_network_policy(client).await;
}
}
self.running = false;
self.pod_name = None;
Ok(())
}
fn name(&self) -> &str {
&self.name
}
fn backend_type(&self) -> BackendType {
BackendType::Kubernetes
}
fn is_running(&self) -> bool {
self.running
}
async fn write_file_unchecked(&mut self, path: &str, content: &[u8]) -> Result<()> {
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(content);
if let Some(parent) = std::path::Path::new(path).parent() {
let parent_str = parent.to_string_lossy();
if parent_str != "/" {
let mkdir_cmd = format!("mkdir -p '{}'", parent_str);
self.exec(&["sh", "-c", &mkdir_cmd]).await?;
}
}
let write_cmd = format!("echo '{}' | base64 -d > '{}'", encoded, path);
let result = self.exec(&["sh", "-c", &write_cmd]).await?;
if !result.is_success() {
bail!("Failed to write file {}: {}", path, result.stderr);
}
Ok(())
}
async fn read_file_unchecked(&mut self, path: &str) -> Result<Vec<u8>> {
let read_cmd = format!("base64 '{}'", path);
let result = self.exec(&["sh", "-c", &read_cmd]).await?;
if !result.is_success() {
bail!("Failed to read file {}: {}", path, result.stderr);
}
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(result.stdout.trim())
.context("Failed to decode base64 file content")?;
Ok(decoded)
}
async fn remove_file_unchecked(&mut self, path: &str) -> Result<()> {
let rm_cmd = format!("rm -f '{}'", path);
self.exec(&["sh", "-c", &rm_cmd]).await?;
Ok(())
}
async fn mkdir_unchecked(&mut self, path: &str, recursive: bool) -> Result<()> {
let flag = if recursive { "-p" } else { "" };
let cmd = format!("mkdir {} '{}'", flag, path);
self.exec(&["sh", "-c", &cmd]).await?;
Ok(())
}
async fn attach(&mut self, shell: Option<&str>) -> Result<i32> {
let client = self
.client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("K8s client not initialized"))?;
let pod_name = self
.pod_name
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Pod not started"))?;
let shell = shell.unwrap_or("/bin/sh");
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
let mut attached = pods
.exec(
pod_name,
vec![shell.to_string()],
&kube::api::AttachParams::default()
.container("sandbox")
.stdin(true)
.stdout(true)
.stderr(true)
.tty(true),
)
.await
.context("Failed to attach to K8s pod")?;
let mut stdin_writer = attached
.stdin()
.ok_or_else(|| anyhow::anyhow!("No stdin"))?;
let mut stdout_reader = attached
.stdout()
.ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let stdin_handle = tokio::spawn(async move {
let mut host_stdin = tokio::io::stdin();
let _ = tokio::io::copy(&mut host_stdin, &mut stdin_writer).await;
});
let stdout_handle = tokio::spawn(async move {
let mut host_stdout = tokio::io::stdout();
let _ = tokio::io::copy(&mut stdout_reader, &mut host_stdout).await;
});
tokio::select! {
_ = stdin_handle => {},
_ = stdout_handle => {},
}
Ok(0)
}
async fn inject_files(&mut self, files: &[super::FileInjection]) -> Result<()> {
for file in files {
if let Some(parent) = std::path::Path::new(&file.dest).parent() {
let parent_str = parent.to_string_lossy();
if parent_str != "/" {
self.mkdir(&parent_str, true).await?;
}
}
self.write_file(&file.dest, &file.content).await?;
}
Ok(())
}
}
fn tilde_expand(path: &str) -> String {
if path.starts_with("~/")
&& let Some(home) = std::env::var_os("HOME")
{
return format!("{}{}", home.to_string_lossy(), &path[1..]);
}
path.to_string()
}