Skip to main content

agentkernel/backend/
kubernetes.rs

1//! Kubernetes backend implementing the Sandbox trait.
2//!
3//! Each sandbox is a Kubernetes Pod. start() creates a Pod with `sleep infinity`,
4//! exec() runs commands via the K8s exec API (WebSocket), stop() deletes the Pod.
5//!
6//! Compile with `--features kubernetes` to enable.
7
8use anyhow::{Context, Result, bail};
9use async_trait::async_trait;
10use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
11use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicySpec};
12use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
13use kube::api::{Api, DeleteParams, PostParams};
14use kube::config::{KubeConfigOptions, Kubeconfig};
15use kube::{Client, Config as KubeConfig};
16use std::collections::{BTreeMap, HashMap};
17use tokio::io::AsyncReadExt;
18
19use super::{BackendType, ExecResult, Sandbox, SandboxConfig};
20use crate::config::OrchestratorConfig;
21
22/// Kubernetes Pod-based sandbox
23pub struct KubernetesSandbox {
24    /// Sandbox name
25    name: String,
26    /// Kubernetes namespace for this sandbox's pod
27    namespace: String,
28    /// Pod name (set after start())
29    pod_name: Option<String>,
30    /// Whether the sandbox is running
31    running: bool,
32    /// Kubernetes API client (initialized lazily on start())
33    client: Option<Client>,
34    /// Optional runtime class (e.g., "gvisor", "kata")
35    runtime_class: Option<String>,
36    /// Optional service account for the pod
37    service_account: Option<String>,
38    /// Node selector labels for scheduling
39    node_selector: HashMap<String, String>,
40    /// Whether a NetworkPolicy was created (for cleanup)
41    network_policy_created: bool,
42    /// Whether network is disabled (used to decide on NetworkPolicy)
43    network_disabled: bool,
44}
45
46impl KubernetesSandbox {
47    /// Create a new Kubernetes sandbox from orchestrator configuration
48    pub fn new(name: &str, config: &OrchestratorConfig) -> Self {
49        Self {
50            name: name.to_string(),
51            namespace: config.namespace.clone(),
52            pod_name: None,
53            running: false,
54            client: None,
55            runtime_class: config.runtime_class.clone(),
56            service_account: config.service_account.clone(),
57            node_selector: config.node_selector.clone(),
58            network_policy_created: false,
59            network_disabled: false,
60        }
61    }
62
63    /// Build the Kubernetes API client
64    async fn build_client(config: &OrchestratorConfig) -> Result<Client> {
65        // Try in-cluster config first (when running inside K8s)
66        if let Ok(config) = KubeConfig::incluster() {
67            return Client::try_from(config).context("Failed to create in-cluster K8s client");
68        }
69
70        // Fall back to kubeconfig
71        let kubeconfig = if let Some(ref path) = config.kubeconfig {
72            let expanded = tilde_expand(path);
73            Kubeconfig::read_from(expanded).context("Failed to read kubeconfig")?
74        } else {
75            Kubeconfig::read().context("Failed to read default kubeconfig")?
76        };
77
78        let mut options = KubeConfigOptions::default();
79        if let Some(ref ctx) = config.context {
80            options.context = Some(ctx.clone());
81        }
82
83        let kube_config = KubeConfig::from_custom_kubeconfig(kubeconfig, &options)
84            .await
85            .context("Failed to build K8s config from kubeconfig")?;
86
87        Client::try_from(kube_config).context("Failed to create K8s client")
88    }
89
90    /// Generate the pod name for this sandbox
91    fn pod_name_for(sandbox_name: &str) -> String {
92        // K8s names must be DNS-compatible: lowercase, alphanumeric, hyphens
93        let sanitized: String = sandbox_name
94            .to_lowercase()
95            .chars()
96            .map(|c| {
97                if c.is_alphanumeric() || c == '-' {
98                    c
99                } else {
100                    '-'
101                }
102            })
103            .collect();
104        format!("agentkernel-{}", sanitized)
105    }
106
107    /// Standard labels for all agentkernel-managed pods
108    fn pod_labels(sandbox_name: &str) -> BTreeMap<String, String> {
109        let mut labels = BTreeMap::new();
110        labels.insert("agentkernel/sandbox".to_string(), sandbox_name.to_string());
111        labels.insert(
112            "agentkernel/managed-by".to_string(),
113            "agentkernel".to_string(),
114        );
115        labels.insert("agentkernel/pool".to_string(), "active".to_string());
116        labels
117    }
118
119    /// Build the Pod spec for this sandbox
120    fn build_pod_spec(&self, config: &SandboxConfig) -> Pod {
121        let pod_name = Self::pod_name_for(&self.name);
122        let labels = Self::pod_labels(&self.name);
123
124        // Build container security context
125        let mut security_context = k8s_openapi::api::core::v1::SecurityContext {
126            privileged: Some(false),
127            allow_privilege_escalation: Some(false),
128            read_only_root_filesystem: Some(config.read_only),
129            run_as_non_root: Some(true),
130            run_as_user: Some(1000),
131            ..Default::default()
132        };
133
134        // Drop all capabilities
135        security_context.capabilities = Some(k8s_openapi::api::core::v1::Capabilities {
136            drop: Some(vec!["ALL".to_string()]),
137            ..Default::default()
138        });
139
140        // Resource limits
141        let mut resource_limits = BTreeMap::new();
142        resource_limits.insert(
143            "memory".to_string(),
144            k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
145                "{}Mi",
146                config.memory_mb
147            )),
148        );
149        resource_limits.insert(
150            "cpu".to_string(),
151            k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
152                "{}m",
153                config.vcpus * 1000
154            )),
155        );
156
157        let resource_requests = BTreeMap::new();
158
159        let resources = k8s_openapi::api::core::v1::ResourceRequirements {
160            limits: Some(resource_limits),
161            requests: Some(resource_requests),
162            ..Default::default()
163        };
164
165        // Build container port specs
166        let container_ports: Option<Vec<k8s_openapi::api::core::v1::ContainerPort>> =
167            if config.ports.is_empty() {
168                None
169            } else {
170                Some(
171                    config
172                        .ports
173                        .iter()
174                        .map(|pm| k8s_openapi::api::core::v1::ContainerPort {
175                            container_port: pm.container_port as i32,
176                            protocol: Some(match pm.protocol {
177                                super::PortProtocol::Tcp => "TCP".to_string(),
178                                super::PortProtocol::Udp => "UDP".to_string(),
179                            }),
180                            ..Default::default()
181                        })
182                        .collect(),
183                )
184            };
185
186        // Main container: sleep infinity as entrypoint
187        let container = Container {
188            name: "sandbox".to_string(),
189            image: Some(config.image.clone()),
190            command: Some(vec![
191                "sh".to_string(),
192                "-c".to_string(),
193                "sleep infinity".to_string(),
194            ]),
195            security_context: Some(security_context),
196            resources: Some(resources),
197            ports: container_ports,
198            stdin: Some(true),
199            tty: Some(true),
200            ..Default::default()
201        };
202
203        // Build node selector
204        let node_selector: Option<BTreeMap<String, String>> = if self.node_selector.is_empty() {
205            None
206        } else {
207            Some(
208                self.node_selector
209                    .iter()
210                    .map(|(k, v)| (k.clone(), v.clone()))
211                    .collect(),
212            )
213        };
214
215        // Pod spec
216        let pod_spec = PodSpec {
217            containers: vec![container],
218            restart_policy: Some("Never".to_string()),
219            automount_service_account_token: Some(false),
220            runtime_class_name: self.runtime_class.clone(),
221            service_account_name: self.service_account.clone(),
222            node_selector,
223            ..Default::default()
224        };
225
226        Pod {
227            metadata: ObjectMeta {
228                name: Some(pod_name),
229                namespace: Some(self.namespace.clone()),
230                labels: Some(labels),
231                annotations: Some({
232                    let mut ann = BTreeMap::new();
233                    ann.insert(
234                        "pod-security.kubernetes.io/enforce".to_string(),
235                        "restricted".to_string(),
236                    );
237                    ann
238                }),
239                ..Default::default()
240            },
241            spec: Some(pod_spec),
242            ..Default::default()
243        }
244    }
245
246    /// Create a NetworkPolicy that denies all ingress/egress for this pod
247    async fn create_network_policy(&self, client: &Client) -> Result<()> {
248        let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
249
250        let pod_name = Self::pod_name_for(&self.name);
251        let np_name = format!("{}-deny-all", pod_name);
252
253        let mut match_labels = BTreeMap::new();
254        match_labels.insert("agentkernel/sandbox".to_string(), self.name.clone());
255
256        let np = NetworkPolicy {
257            metadata: ObjectMeta {
258                name: Some(np_name),
259                namespace: Some(self.namespace.clone()),
260                ..Default::default()
261            },
262            spec: Some(NetworkPolicySpec {
263                pod_selector: LabelSelector {
264                    match_labels: Some(match_labels),
265                    ..Default::default()
266                },
267                // Empty ingress and egress = deny all
268                ingress: Some(vec![]),
269                egress: Some(vec![]),
270                policy_types: Some(vec!["Ingress".to_string(), "Egress".to_string()]),
271            }),
272        };
273
274        np_api
275            .create(&PostParams::default(), &np)
276            .await
277            .context("Failed to create NetworkPolicy")?;
278
279        Ok(())
280    }
281
282    /// Delete the NetworkPolicy for this sandbox
283    async fn delete_network_policy(&self, client: &Client) -> Result<()> {
284        let np_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &self.namespace);
285        let pod_name = Self::pod_name_for(&self.name);
286        let np_name = format!("{}-deny-all", pod_name);
287
288        let _ = np_api.delete(&np_name, &DeleteParams::default()).await;
289        Ok(())
290    }
291
292    /// Wait for the pod to reach the Running phase.
293    /// Uses exponential backoff: 50ms → 100ms → 200ms → 500ms (capped).
294    async fn wait_for_running(&self, client: &Client, pod_name: &str) -> Result<()> {
295        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
296        let mut delay_ms: u64 = 50;
297        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120);
298
299        loop {
300            let pod = pods.get(pod_name).await?;
301            if let Some(status) = &pod.status
302                && let Some(phase) = &status.phase
303            {
304                match phase.as_str() {
305                    "Running" => return Ok(()),
306                    "Failed" | "Succeeded" => {
307                        bail!("Pod entered unexpected phase: {}", phase);
308                    }
309                    _ => {} // Pending, etc.
310                }
311            }
312
313            if tokio::time::Instant::now() >= deadline {
314                bail!("Timed out waiting for pod '{}' to start", pod_name);
315            }
316
317            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
318            delay_ms = (delay_ms * 2).min(500);
319        }
320    }
321}
322
323#[async_trait]
324impl Sandbox for KubernetesSandbox {
325    async fn start(&mut self, config: &SandboxConfig) -> Result<()> {
326        // Build K8s client
327        let orch_config = OrchestratorConfig {
328            namespace: self.namespace.clone(),
329            ..Default::default()
330        };
331        let client = Self::build_client(&orch_config).await?;
332
333        // Ensure namespace exists (only create if missing)
334        let ns_api: Api<k8s_openapi::api::core::v1::Namespace> = Api::all(client.clone());
335        if ns_api.get(&self.namespace).await.is_err() {
336            let _ = ns_api
337                .create(
338                    &PostParams::default(),
339                    &k8s_openapi::api::core::v1::Namespace {
340                        metadata: ObjectMeta {
341                            name: Some(self.namespace.clone()),
342                            ..Default::default()
343                        },
344                        ..Default::default()
345                    },
346                )
347                .await;
348        }
349
350        // Build and create the pod
351        let pod = self.build_pod_spec(config);
352        let pod_name = pod
353            .metadata
354            .name
355            .clone()
356            .unwrap_or_else(|| Self::pod_name_for(&self.name));
357
358        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
359        pods.create(&PostParams::default(), &pod)
360            .await
361            .context("Failed to create K8s pod")?;
362
363        // Create NetworkPolicy if network is disabled
364        self.network_disabled = !config.network;
365        if !config.network {
366            self.create_network_policy(&client).await?;
367            self.network_policy_created = true;
368        }
369
370        // Wait for the pod to be running
371        self.wait_for_running(&client, &pod_name).await?;
372
373        self.pod_name = Some(pod_name);
374        self.client = Some(client);
375        self.running = true;
376
377        Ok(())
378    }
379
380    async fn exec(&mut self, cmd: &[&str]) -> Result<ExecResult> {
381        self.exec_with_env(cmd, &[]).await
382    }
383
384    async fn exec_with_env(&mut self, cmd: &[&str], env: &[String]) -> Result<ExecResult> {
385        // Lazily initialize client and pod_name if needed (e.g., reconnecting to a running pod)
386        if self.client.is_none() {
387            let orch_config = OrchestratorConfig {
388                namespace: self.namespace.clone(),
389                ..Default::default()
390            };
391            let client = Self::build_client(&orch_config).await?;
392            self.client = Some(client);
393        }
394        if self.pod_name.is_none() {
395            self.pod_name = Some(Self::pod_name_for(&self.name));
396        }
397
398        let client = self.client.as_ref().unwrap();
399        let pod_name = self.pod_name.as_ref().unwrap();
400
401        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
402
403        // Wrap command with env if provided
404        let full_cmd: Vec<String> = if env.is_empty() {
405            cmd.iter().map(|s| s.to_string()).collect()
406        } else {
407            // Build: env KEY=VAL KEY2=VAL2 ... <original command>
408            let mut parts = vec!["env".to_string()];
409            parts.extend(env.iter().cloned());
410            parts.extend(cmd.iter().map(|s| s.to_string()));
411            parts
412        };
413
414        // Use the kube API for pod exec via WebSocket
415        let mut attached = pods
416            .exec(
417                pod_name,
418                full_cmd,
419                &kube::api::AttachParams::default()
420                    .container("sandbox")
421                    .stdout(true)
422                    .stderr(true),
423            )
424            .await
425            .context("Failed to exec in K8s pod")?;
426
427        // Read stdout and stderr concurrently
428        let mut stdout_reader = attached
429            .stdout()
430            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
431        let mut stderr_reader = attached
432            .stderr()
433            .ok_or_else(|| anyhow::anyhow!("No stderr"))?;
434
435        let mut stdout_buf = Vec::new();
436        let mut stderr_buf = Vec::new();
437
438        let (stdout_result, stderr_result) = tokio::join!(
439            stdout_reader.read_to_end(&mut stdout_buf),
440            stderr_reader.read_to_end(&mut stderr_buf),
441        );
442
443        stdout_result.context("Failed to read stdout")?;
444        stderr_result.context("Failed to read stderr")?;
445
446        let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
447        let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
448
449        // Wait for the process to complete; infer exit code from stderr
450        let _ = attached.join().await;
451        let exit_code = if stderr.is_empty() { 0 } else { 1 };
452
453        Ok(ExecResult {
454            exit_code,
455            stdout,
456            stderr,
457        })
458    }
459
460    async fn stop(&mut self) -> Result<()> {
461        // Lazily initialize client and pod_name if needed
462        if self.client.is_none() {
463            let orch_config = OrchestratorConfig {
464                namespace: self.namespace.clone(),
465                ..Default::default()
466            };
467            if let Ok(client) = Self::build_client(&orch_config).await {
468                self.client = Some(client);
469            }
470        }
471        if self.pod_name.is_none() {
472            self.pod_name = Some(Self::pod_name_for(&self.name));
473        }
474
475        if let (Some(client), Some(pod_name)) = (&self.client, &self.pod_name) {
476            let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
477
478            // Delete the pod
479            let _ = pods
480                .delete(pod_name, &DeleteParams::default())
481                .await
482                .context("Failed to delete K8s pod");
483
484            // Clean up NetworkPolicy if we created one
485            if self.network_policy_created {
486                let _ = self.delete_network_policy(client).await;
487            }
488        }
489
490        self.running = false;
491        self.pod_name = None;
492        Ok(())
493    }
494
495    fn name(&self) -> &str {
496        &self.name
497    }
498
499    fn backend_type(&self) -> BackendType {
500        BackendType::Kubernetes
501    }
502
503    fn is_running(&self) -> bool {
504        self.running
505    }
506
507    async fn write_file_unchecked(&mut self, path: &str, content: &[u8]) -> Result<()> {
508        use base64::Engine;
509        let encoded = base64::engine::general_purpose::STANDARD.encode(content);
510
511        // Create parent directory first
512        if let Some(parent) = std::path::Path::new(path).parent() {
513            let parent_str = parent.to_string_lossy();
514            if parent_str != "/" {
515                let mkdir_cmd = format!("mkdir -p '{}'", parent_str);
516                self.exec(&["sh", "-c", &mkdir_cmd]).await?;
517            }
518        }
519
520        // Decode base64 into the file
521        let write_cmd = format!("echo '{}' | base64 -d > '{}'", encoded, path);
522        let result = self.exec(&["sh", "-c", &write_cmd]).await?;
523
524        if !result.is_success() {
525            bail!("Failed to write file {}: {}", path, result.stderr);
526        }
527
528        Ok(())
529    }
530
531    async fn read_file_unchecked(&mut self, path: &str) -> Result<Vec<u8>> {
532        let read_cmd = format!("base64 '{}'", path);
533        let result = self.exec(&["sh", "-c", &read_cmd]).await?;
534
535        if !result.is_success() {
536            bail!("Failed to read file {}: {}", path, result.stderr);
537        }
538
539        use base64::Engine;
540        let decoded = base64::engine::general_purpose::STANDARD
541            .decode(result.stdout.trim())
542            .context("Failed to decode base64 file content")?;
543
544        Ok(decoded)
545    }
546
547    async fn remove_file_unchecked(&mut self, path: &str) -> Result<()> {
548        let rm_cmd = format!("rm -f '{}'", path);
549        self.exec(&["sh", "-c", &rm_cmd]).await?;
550        Ok(())
551    }
552
553    async fn mkdir_unchecked(&mut self, path: &str, recursive: bool) -> Result<()> {
554        let flag = if recursive { "-p" } else { "" };
555        let cmd = format!("mkdir {} '{}'", flag, path);
556        self.exec(&["sh", "-c", &cmd]).await?;
557        Ok(())
558    }
559
560    async fn attach(&mut self, shell: Option<&str>) -> Result<i32> {
561        let client = self
562            .client
563            .as_ref()
564            .ok_or_else(|| anyhow::anyhow!("K8s client not initialized"))?;
565        let pod_name = self
566            .pod_name
567            .as_ref()
568            .ok_or_else(|| anyhow::anyhow!("Pod not started"))?;
569
570        let shell = shell.unwrap_or("/bin/sh");
571        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
572
573        let mut attached = pods
574            .exec(
575                pod_name,
576                vec![shell.to_string()],
577                &kube::api::AttachParams::default()
578                    .container("sandbox")
579                    .stdin(true)
580                    .stdout(true)
581                    .stderr(true)
582                    .tty(true),
583            )
584            .await
585            .context("Failed to attach to K8s pod")?;
586
587        // Bridge stdin/stdout for interactive use
588        let mut stdin_writer = attached
589            .stdin()
590            .ok_or_else(|| anyhow::anyhow!("No stdin"))?;
591        let mut stdout_reader = attached
592            .stdout()
593            .ok_or_else(|| anyhow::anyhow!("No stdout"))?;
594
595        let stdin_handle = tokio::spawn(async move {
596            let mut host_stdin = tokio::io::stdin();
597            let _ = tokio::io::copy(&mut host_stdin, &mut stdin_writer).await;
598        });
599
600        let stdout_handle = tokio::spawn(async move {
601            let mut host_stdout = tokio::io::stdout();
602            let _ = tokio::io::copy(&mut stdout_reader, &mut host_stdout).await;
603        });
604
605        // Wait for either to finish
606        tokio::select! {
607            _ = stdin_handle => {},
608            _ = stdout_handle => {},
609        }
610
611        Ok(0)
612    }
613
614    async fn inject_files(&mut self, files: &[super::FileInjection]) -> Result<()> {
615        for file in files {
616            // Create parent directory if needed
617            if let Some(parent) = std::path::Path::new(&file.dest).parent() {
618                let parent_str = parent.to_string_lossy();
619                if parent_str != "/" {
620                    self.mkdir(&parent_str, true).await?;
621                }
622            }
623            // Write the file
624            self.write_file(&file.dest, &file.content).await?;
625        }
626        Ok(())
627    }
628}
629
630/// Expand tilde (~) to home directory in paths
631fn tilde_expand(path: &str) -> String {
632    if path.starts_with("~/")
633        && let Some(home) = std::env::var_os("HOME")
634    {
635        return format!("{}{}", home.to_string_lossy(), &path[1..]);
636    }
637    path.to_string()
638}