1use anyhow::{Context, Result, bail};
9use async_trait::async_trait;
10use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
11use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicySpec};
12use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
13use kube::api::{Api, DeleteParams, PostParams};
14use kube::config::{KubeConfigOptions, Kubeconfig};
15use kube::{Client, Config as KubeConfig};
16use std::collections::{BTreeMap, HashMap};
17use tokio::io::AsyncReadExt;
18
19use super::{BackendType, ExecResult, Sandbox, SandboxConfig};
20use crate::config::OrchestratorConfig;
21
22pub struct KubernetesSandbox {
24 name: String,
26 namespace: String,
28 pod_name: Option<String>,
30 running: bool,
32 client: Option<Client>,
34 runtime_class: Option<String>,
36 service_account: Option<String>,
38 node_selector: HashMap<String, String>,
40 network_policy_created: bool,
42 network_disabled: bool,
44}
45
46impl KubernetesSandbox {
47 pub fn new(name: &str, config: &OrchestratorConfig) -> Self {
49 Self {
50 name: name.to_string(),
51 namespace: config.namespace.clone(),
52 pod_name: None,
53 running: false,
54 client: None,
55 runtime_class: config.runtime_class.clone(),
56 service_account: config.service_account.clone(),
57 node_selector: config.node_selector.clone(),
58 network_policy_created: false,
59 network_disabled: false,
60 }
61 }
62
63 async fn build_client(config: &OrchestratorConfig) -> Result<Client> {
65 if let Ok(config) = KubeConfig::incluster() {
67 return Client::try_from(config).context("Failed to create in-cluster K8s client");
68 }
69
70 let kubeconfig = if let Some(ref path) = config.kubeconfig {
72 let expanded = tilde_expand(path);
73 Kubeconfig::read_from(expanded).context("Failed to read kubeconfig")?
74 } else {
75 Kubeconfig::read().context("Failed to read default kubeconfig")?
76 };
77
78 let mut options = KubeConfigOptions::default();
79 if let Some(ref ctx) = config.context {
80 options.context = Some(ctx.clone());
81 }
82
83 let kube_config = KubeConfig::from_custom_kubeconfig(kubeconfig, &options)
84 .await
85 .context("Failed to build K8s config from kubeconfig")?;
86
87 Client::try_from(kube_config).context("Failed to create K8s client")
88 }
89
90 fn pod_name_for(sandbox_name: &str) -> String {
92 let sanitized: String = sandbox_name
94 .to_lowercase()
95 .chars()
96 .map(|c| {
97 if c.is_alphanumeric() || c == '-' {
98 c
99 } else {
100 '-'
101 }
102 })
103 .collect();
104 format!("agentkernel-{}", sanitized)
105 }
106
107 fn pod_labels(sandbox_name: &str) -> BTreeMap<String, String> {
109 let mut labels = BTreeMap::new();
110 labels.insert("agentkernel/sandbox".to_string(), sandbox_name.to_string());
111 labels.insert(
112 "agentkernel/managed-by".to_string(),
113 "agentkernel".to_string(),
114 );
115 labels.insert("agentkernel/pool".to_string(), "active".to_string());
116 labels
117 }
118
119 fn build_pod_spec(&self, config: &SandboxConfig) -> Pod {
121 let pod_name = Self::pod_name_for(&self.name);
122 let labels = Self::pod_labels(&self.name);
123
124 let mut security_context = k8s_openapi::api::core::v1::SecurityContext {
126 privileged: Some(false),
127 allow_privilege_escalation: Some(false),
128 read_only_root_filesystem: Some(config.read_only),
129 run_as_non_root: Some(true),
130 run_as_user: Some(1000),
131 ..Default::default()
132 };
133
134 security_context.capabilities = Some(k8s_openapi::api::core::v1::Capabilities {
136 drop: Some(vec!["ALL".to_string()]),
137 ..Default::default()
138 });
139
140 let mut resource_limits = BTreeMap::new();
142 resource_limits.insert(
143 "memory".to_string(),
144 k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
145 "{}Mi",
146 config.memory_mb
147 )),
148 );
149 resource_limits.insert(
150 "cpu".to_string(),
151 k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
152 "{}m",
153 config.vcpus * 1000
154 )),
155 );
156
157 let resource_requests = BTreeMap::new();
158
159 let resources = k8s_openapi::api::core::v1::ResourceRequirements {
160 limits: Some(resource_limits),
161 requests: Some(resource_requests),
162 ..Default::default()
163 };
164
165 let container_ports: Option<Vec<k8s_openapi::api::core::v1::ContainerPort>> =
167 if config.ports.is_empty() {
168 None
169 } else {
170 Some(
171 config
172 .ports
173 .iter()
174 .map(|pm| k8s_openapi::api::core::v1::ContainerPort {
175 container_port: pm.container_port as i32,
176 protocol: Some(match pm.protocol {
177 super::PortProtocol::Tcp => "TCP".to_string(),
178 super::PortProtocol::Udp => "UDP".to_string(),
179 }),
180 ..Default::default()
181 })
182 .collect(),
183 )
184 };
185
186 let container = Container {
188 name: "sandbox".to_string(),
189 image: Some(config.image.clone()),
190 command: Some(vec![
191 "sh".to_string(),
192 "-c".to_string(),
193 "sleep infinity".to_string(),
194 ]),
195 security_context: Some(security_context),
196 resources: Some(resources),
197 ports: container_ports,
198 stdin: Some(true),
199 tty: Some(true),
200 ..Default::default()
201 };
202
203 let node_selector: Option<BTreeMap<String, String>> = if self.node_selector.is_empty() {
205 None
206 } else {
207 Some(
208 self.node_selector
209 .iter()
210 .map(|(k, v)| (k.clone(), v.clone()))
211 .collect(),
212 )
213 };
214
215 let pod_spec = PodSpec {
217 containers: vec![container],
218 restart_policy: Some("Never".to_string()),
219 automount_service_account_token: Some(false),
220 runtime_class_name: self.runtime_class.clone(),
221 service_account_name: self.service_account.clone(),
222 node_selector,
223 ..Default::default()
224 };
225
226 Pod {
227 metadata: ObjectMeta {
228 name: Some(pod_name),
229 namespace: Some(self.namespace.clone()),
230 labels: Some(labels),
231 annotations: Some({
232 let mut ann = BTreeMap::new();
233 ann.insert(
234 "pod-security.kubernetes.io/enforce".to_string(),
235 "restricted".to_string(),
236 );
237 ann
238 }),
239 ..Default::default()
240 },
241 spec: Some(pod_spec),
242 ..Default::default()
243 }
244 }
245
246 async fn create_network_policy(&self, client: &Client) -> Result<()> {
248 let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
249
250 let pod_name = Self::pod_name_for(&self.name);
251 let np_name = format!("{}-deny-all", pod_name);
252
253 let mut match_labels = BTreeMap::new();
254 match_labels.insert("agentkernel/sandbox".to_string(), self.name.clone());
255
256 let np = NetworkPolicy {
257 metadata: ObjectMeta {
258 name: Some(np_name),
259 namespace: Some(self.namespace.clone()),
260 ..Default::default()
261 },
262 spec: Some(NetworkPolicySpec {
263 pod_selector: LabelSelector {
264 match_labels: Some(match_labels),
265 ..Default::default()
266 },
267 ingress: Some(vec![]),
269 egress: Some(vec![]),
270 policy_types: Some(vec!["Ingress".to_string(), "Egress".to_string()]),
271 }),
272 };
273
274 np_api
275 .create(&PostParams::default(), &np)
276 .await
277 .context("Failed to create NetworkPolicy")?;
278
279 Ok(())
280 }
281
282 async fn delete_network_policy(&self, client: &Client) -> Result<()> {
284 let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
285 let pod_name = Self::pod_name_for(&self.name);
286 let np_name = format!("{}-deny-all", pod_name);
287
288 let _ = np_api.delete(&np_name, &DeleteParams::default()).await;
289 Ok(())
290 }
291
292 async fn wait_for_running(&self, client: &Client, pod_name: &str) -> Result<()> {
295 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
296 let mut delay_ms: u64 = 50;
297 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120);
298
299 loop {
300 let pod = pods.get(pod_name).await?;
301 if let Some(status) = &pod.status
302 && let Some(phase) = &status.phase
303 {
304 match phase.as_str() {
305 "Running" => return Ok(()),
306 "Failed" | "Succeeded" => {
307 bail!("Pod entered unexpected phase: {}", phase);
308 }
309 _ => {} }
311 }
312
313 if tokio::time::Instant::now() >= deadline {
314 bail!("Timed out waiting for pod '{}' to start", pod_name);
315 }
316
317 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
318 delay_ms = (delay_ms * 2).min(500);
319 }
320 }
321}
322
323#[async_trait]
324impl Sandbox for KubernetesSandbox {
325 async fn start(&mut self, config: &SandboxConfig) -> Result<()> {
326 let orch_config = OrchestratorConfig {
328 namespace: self.namespace.clone(),
329 ..Default::default()
330 };
331 let client = Self::build_client(&orch_config).await?;
332
333 let ns_api: Api<k8s_openapi::api::core::v1::Namespace> = Api::all(client.clone());
335 if ns_api.get(&self.namespace).await.is_err() {
336 let _ = ns_api
337 .create(
338 &PostParams::default(),
339 &k8s_openapi::api::core::v1::Namespace {
340 metadata: ObjectMeta {
341 name: Some(self.namespace.clone()),
342 ..Default::default()
343 },
344 ..Default::default()
345 },
346 )
347 .await;
348 }
349
350 let pod = self.build_pod_spec(config);
352 let pod_name = pod
353 .metadata
354 .name
355 .clone()
356 .unwrap_or_else(|| Self::pod_name_for(&self.name));
357
358 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
359 pods.create(&PostParams::default(), &pod)
360 .await
361 .context("Failed to create K8s pod")?;
362
363 self.network_disabled = !config.network;
365 if !config.network {
366 self.create_network_policy(&client).await?;
367 self.network_policy_created = true;
368 }
369
370 self.wait_for_running(&client, &pod_name).await?;
372
373 self.pod_name = Some(pod_name);
374 self.client = Some(client);
375 self.running = true;
376
377 Ok(())
378 }
379
380 async fn exec(&mut self, cmd: &[&str]) -> Result<ExecResult> {
381 self.exec_with_env(cmd, &[]).await
382 }
383
384 async fn exec_with_env(&mut self, cmd: &[&str], env: &[String]) -> Result<ExecResult> {
385 if self.client.is_none() {
387 let orch_config = OrchestratorConfig {
388 namespace: self.namespace.clone(),
389 ..Default::default()
390 };
391 let client = Self::build_client(&orch_config).await?;
392 self.client = Some(client);
393 }
394 if self.pod_name.is_none() {
395 self.pod_name = Some(Self::pod_name_for(&self.name));
396 }
397
398 let client = self.client.as_ref().unwrap();
399 let pod_name = self.pod_name.as_ref().unwrap();
400
401 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
402
403 let full_cmd: Vec<String> = if env.is_empty() {
405 cmd.iter().map(|s| s.to_string()).collect()
406 } else {
407 let mut parts = vec!["env".to_string()];
409 parts.extend(env.iter().cloned());
410 parts.extend(cmd.iter().map(|s| s.to_string()));
411 parts
412 };
413
414 let mut attached = pods
416 .exec(
417 pod_name,
418 full_cmd,
419 &kube::api::AttachParams::default()
420 .container("sandbox")
421 .stdout(true)
422 .stderr(true),
423 )
424 .await
425 .context("Failed to exec in K8s pod")?;
426
427 let mut stdout_reader = attached
429 .stdout()
430 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
431 let mut stderr_reader = attached
432 .stderr()
433 .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
434
435 let mut stdout_buf = Vec::new();
436 let mut stderr_buf = Vec::new();
437
438 let (stdout_result, stderr_result) = tokio::join!(
439 stdout_reader.read_to_end(&mut stdout_buf),
440 stderr_reader.read_to_end(&mut stderr_buf),
441 );
442
443 stdout_result.context("Failed to read stdout")?;
444 stderr_result.context("Failed to read stderr")?;
445
446 let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
447 let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
448
449 let _ = attached.join().await;
451 let exit_code = if stderr.is_empty() { 0 } else { 1 };
452
453 Ok(ExecResult {
454 exit_code,
455 stdout,
456 stderr,
457 })
458 }
459
460 async fn stop(&mut self) -> Result<()> {
461 if self.client.is_none() {
463 let orch_config = OrchestratorConfig {
464 namespace: self.namespace.clone(),
465 ..Default::default()
466 };
467 if let Ok(client) = Self::build_client(&orch_config).await {
468 self.client = Some(client);
469 }
470 }
471 if self.pod_name.is_none() {
472 self.pod_name = Some(Self::pod_name_for(&self.name));
473 }
474
475 if let (Some(client), Some(pod_name)) = (&self.client, &self.pod_name) {
476 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
477
478 let _ = pods
480 .delete(pod_name, &DeleteParams::default())
481 .await
482 .context("Failed to delete K8s pod");
483
484 if self.network_policy_created {
486 let _ = self.delete_network_policy(client).await;
487 }
488 }
489
490 self.running = false;
491 self.pod_name = None;
492 Ok(())
493 }
494
495 fn name(&self) -> &str {
496 &self.name
497 }
498
499 fn backend_type(&self) -> BackendType {
500 BackendType::Kubernetes
501 }
502
503 fn is_running(&self) -> bool {
504 self.running
505 }
506
507 async fn write_file_unchecked(&mut self, path: &str, content: &[u8]) -> Result<()> {
508 use base64::Engine;
509 let encoded = base64::engine::general_purpose::STANDARD.encode(content);
510
511 if let Some(parent) = std::path::Path::new(path).parent() {
513 let parent_str = parent.to_string_lossy();
514 if parent_str != "/" {
515 let mkdir_cmd = format!("mkdir -p '{}'", parent_str);
516 self.exec(&["sh", "-c", &mkdir_cmd]).await?;
517 }
518 }
519
520 let write_cmd = format!("echo '{}' | base64 -d > '{}'", encoded, path);
522 let result = self.exec(&["sh", "-c", &write_cmd]).await?;
523
524 if !result.is_success() {
525 bail!("Failed to write file {}: {}", path, result.stderr);
526 }
527
528 Ok(())
529 }
530
531 async fn read_file_unchecked(&mut self, path: &str) -> Result<Vec<u8>> {
532 let read_cmd = format!("base64 '{}'", path);
533 let result = self.exec(&["sh", "-c", &read_cmd]).await?;
534
535 if !result.is_success() {
536 bail!("Failed to read file {}: {}", path, result.stderr);
537 }
538
539 use base64::Engine;
540 let decoded = base64::engine::general_purpose::STANDARD
541 .decode(result.stdout.trim())
542 .context("Failed to decode base64 file content")?;
543
544 Ok(decoded)
545 }
546
547 async fn remove_file_unchecked(&mut self, path: &str) -> Result<()> {
548 let rm_cmd = format!("rm -f '{}'", path);
549 self.exec(&["sh", "-c", &rm_cmd]).await?;
550 Ok(())
551 }
552
553 async fn mkdir_unchecked(&mut self, path: &str, recursive: bool) -> Result<()> {
554 let flag = if recursive { "-p" } else { "" };
555 let cmd = format!("mkdir {} '{}'", flag, path);
556 self.exec(&["sh", "-c", &cmd]).await?;
557 Ok(())
558 }
559
560 async fn attach(&mut self, shell: Option<&str>) -> Result<i32> {
561 let client = self
562 .client
563 .as_ref()
564 .ok_or_else(|| anyhow::anyhow!("K8s client not initialized"))?;
565 let pod_name = self
566 .pod_name
567 .as_ref()
568 .ok_or_else(|| anyhow::anyhow!("Pod not started"))?;
569
570 let shell = shell.unwrap_or("/bin/sh");
571 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
572
573 let mut attached = pods
574 .exec(
575 pod_name,
576 vec![shell.to_string()],
577 &kube::api::AttachParams::default()
578 .container("sandbox")
579 .stdin(true)
580 .stdout(true)
581 .stderr(true)
582 .tty(true),
583 )
584 .await
585 .context("Failed to attach to K8s pod")?;
586
587 let mut stdin_writer = attached
589 .stdin()
590 .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
591 let mut stdout_reader = attached
592 .stdout()
593 .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
594
595 let stdin_handle = tokio::spawn(async move {
596 let mut host_stdin = tokio::io::stdin();
597 let _ = tokio::io::copy(&mut host_stdin, &mut stdin_writer).await;
598 });
599
600 let stdout_handle = tokio::spawn(async move {
601 let mut host_stdout = tokio::io::stdout();
602 let _ = tokio::io::copy(&mut stdout_reader, &mut host_stdout).await;
603 });
604
605 tokio::select! {
607 _ = stdin_handle => {},
608 _ = stdout_handle => {},
609 }
610
611 Ok(0)
612 }
613
614 async fn inject_files(&mut self, files: &[super::FileInjection]) -> Result<()> {
615 for file in files {
616 if let Some(parent) = std::path::Path::new(&file.dest).parent() {
618 let parent_str = parent.to_string_lossy();
619 if parent_str != "/" {
620 self.mkdir(&parent_str, true).await?;
621 }
622 }
623 self.write_file(&file.dest, &file.content).await?;
625 }
626 Ok(())
627 }
628}
629
630fn tilde_expand(path: &str) -> String {
632 if path.starts_with("~/")
633 && let Some(home) = std::env::var_os("HOME")
634 {
635 return format!("{}{}", home.to_string_lossy(), &path[1..]);
636 }
637 path.to_string()
638}