use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use serde::Deserialize;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use tracing::{debug, trace, warn};
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Kubeconfig {
#[serde(default)]
pub api_version: String,
#[serde(default)]
pub kind: String,
pub current_context: Option<String>,
#[serde(default)]
pub clusters: Vec<NamedCluster>,
#[serde(default)]
pub contexts: Vec<NamedContext>,
#[serde(default)]
pub users: Vec<NamedUser>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct NamedCluster {
pub name: String,
pub cluster: ClusterConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct ClusterConfig {
pub server: String,
pub certificate_authority_data: Option<String>,
pub certificate_authority: Option<String>,
#[serde(default)]
pub insecure_skip_tls_verify: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct NamedContext {
pub name: String,
pub context: ContextConfig,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ContextConfig {
pub cluster: String,
pub user: String,
pub namespace: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct NamedUser {
pub name: String,
pub user: UserConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct UserConfig {
pub token: Option<String>,
pub token_file: Option<String>,
pub client_certificate_data: Option<String>,
pub client_certificate: Option<String>,
pub client_key_data: Option<String>,
pub client_key: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub exec: Option<ExecConfig>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecConfig {
pub api_version: Option<String>,
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub env: Vec<ExecEnv>,
#[serde(default)]
pub provide_cluster_info: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExecEnv {
pub name: String,
pub value: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecCredential {
pub api_version: Option<String>,
pub kind: Option<String>,
pub status: Option<ExecCredentialStatus>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExecCredentialStatus {
pub expiration_timestamp: Option<String>,
pub token: Option<String>,
pub client_certificate_data: Option<String>,
pub client_key_data: Option<String>,
}
#[derive(Debug, Clone)]
pub enum KubeAuth {
Token(String),
ClientCert { cert: Vec<u8>, key: Vec<u8> },
None,
}
#[derive(Debug, Clone)]
pub struct ResolvedKubeConfig {
pub server: String,
pub ca_cert: Option<Vec<u8>>,
pub auth: KubeAuth,
pub namespace: Option<String>,
pub insecure_skip_tls_verify: bool,
}
#[derive(Debug, thiserror::Error)]
pub enum KubeconfigError {
#[error("Failed to read kubeconfig file: {0}")]
ReadError(#[from] std::io::Error),
#[error("Failed to parse kubeconfig YAML: {0}")]
ParseError(#[from] serde_yaml::Error),
#[error("No current context set in kubeconfig")]
NoCurrentContext,
#[error("Context '{0}' not found")]
ContextNotFound(String),
#[error("Cluster '{0}' not found")]
ClusterNotFound(String),
#[error("User '{0}' not found")]
UserNotFound(String),
#[error("Failed to decode base64: {0}")]
Base64Error(#[from] base64::DecodeError),
#[error("Exec command failed: {0}")]
ExecError(String),
#[error("Failed to parse exec credential: {0}")]
ExecParseError(String),
}
impl Kubeconfig {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, KubeconfigError> {
let path = path.as_ref();
debug!(path = %path.display(), "Loading kubeconfig");
let content = std::fs::read_to_string(path)?;
Self::from_str(&content)
}
pub fn from_default_location() -> Result<Self, KubeconfigError> {
let home = std::env::var("HOME").unwrap_or_else(|_| "/root".to_string());
let path = PathBuf::from(home).join(".kube").join("config");
Self::from_file(path)
}
#[allow(clippy::should_implement_trait)]
pub fn from_str(content: &str) -> Result<Self, KubeconfigError> {
let config: Kubeconfig = serde_yaml::from_str(content)?;
trace!(
clusters = config.clusters.len(),
contexts = config.contexts.len(),
users = config.users.len(),
"Parsed kubeconfig"
);
Ok(config)
}
pub fn current_context(&self) -> Result<String, KubeconfigError> {
self.current_context
.clone()
.ok_or(KubeconfigError::NoCurrentContext)
}
pub fn get_context(&self, name: &str) -> Result<&ContextConfig, KubeconfigError> {
self.contexts
.iter()
.find(|c| c.name == name)
.map(|c| &c.context)
.ok_or_else(|| KubeconfigError::ContextNotFound(name.to_string()))
}
pub fn get_cluster(&self, name: &str) -> Result<&ClusterConfig, KubeconfigError> {
self.clusters
.iter()
.find(|c| c.name == name)
.map(|c| &c.cluster)
.ok_or_else(|| KubeconfigError::ClusterNotFound(name.to_string()))
}
pub fn get_user(&self, name: &str) -> Result<&UserConfig, KubeconfigError> {
self.users
.iter()
.find(|u| u.name == name)
.map(|u| &u.user)
.ok_or_else(|| KubeconfigError::UserNotFound(name.to_string()))
}
pub fn resolve_current(&self) -> Result<ResolvedKubeConfig, KubeconfigError> {
let context_name = self.current_context()?;
self.resolve_context(&context_name)
}
pub fn resolve_context(
&self,
context_name: &str,
) -> Result<ResolvedKubeConfig, KubeconfigError> {
let context = self.get_context(context_name)?;
let cluster = self.get_cluster(&context.cluster)?;
let user = self.get_user(&context.user)?;
debug!(
context = context_name,
cluster = &context.cluster,
user = &context.user,
server = &cluster.server,
"Resolving kubeconfig context"
);
let ca_cert = self.resolve_ca_cert(cluster)?;
let auth = self.resolve_auth(user)?;
Ok(ResolvedKubeConfig {
server: cluster.server.clone(),
ca_cert,
auth,
namespace: context.namespace.clone(),
insecure_skip_tls_verify: cluster.insecure_skip_tls_verify,
})
}
fn resolve_ca_cert(&self, cluster: &ClusterConfig) -> Result<Option<Vec<u8>>, KubeconfigError> {
if let Some(data) = &cluster.certificate_authority_data {
let decoded = BASE64.decode(data)?;
return Ok(Some(decoded));
}
if let Some(path) = &cluster.certificate_authority {
let expanded = expand_path(path);
let content = std::fs::read(&expanded)?;
return Ok(Some(content));
}
Ok(None)
}
fn resolve_auth(&self, user: &UserConfig) -> Result<KubeAuth, KubeconfigError> {
if let Some(exec) = &user.exec {
return self.resolve_exec_auth(exec);
}
if let Some(token) = &user.token {
return Ok(KubeAuth::Token(token.clone()));
}
if let Some(token_file) = &user.token_file {
let expanded = expand_path(token_file);
let token = std::fs::read_to_string(&expanded)?.trim().to_string();
return Ok(KubeAuth::Token(token));
}
let cert = self.resolve_client_cert(user)?;
let key = self.resolve_client_key(user)?;
if let (Some(cert), Some(key)) = (cert, key) {
return Ok(KubeAuth::ClientCert { cert, key });
}
warn!("No authentication method found in kubeconfig user");
Ok(KubeAuth::None)
}
fn resolve_client_cert(&self, user: &UserConfig) -> Result<Option<Vec<u8>>, KubeconfigError> {
if let Some(data) = &user.client_certificate_data {
let decoded = BASE64.decode(data)?;
return Ok(Some(decoded));
}
if let Some(path) = &user.client_certificate {
let expanded = expand_path(path);
let content = std::fs::read(&expanded)?;
return Ok(Some(content));
}
Ok(None)
}
fn resolve_client_key(&self, user: &UserConfig) -> Result<Option<Vec<u8>>, KubeconfigError> {
if let Some(data) = &user.client_key_data {
let decoded = BASE64.decode(data)?;
return Ok(Some(decoded));
}
if let Some(path) = &user.client_key {
let expanded = expand_path(path);
let content = std::fs::read(&expanded)?;
return Ok(Some(content));
}
Ok(None)
}
fn resolve_exec_auth(&self, exec: &ExecConfig) -> Result<KubeAuth, KubeconfigError> {
debug!(command = &exec.command, "Executing credential command");
let mut cmd = Command::new(&exec.command);
cmd.args(&exec.args);
for env_var in &exec.env {
cmd.env(&env_var.name, &env_var.value);
}
let output = cmd.output().map_err(|e| {
KubeconfigError::ExecError(format!("Failed to execute {}: {}", exec.command, e))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(KubeconfigError::ExecError(format!(
"Command {} failed: {}",
exec.command, stderr
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let cred: ExecCredential = serde_json::from_str(&stdout).map_err(|e| {
KubeconfigError::ExecParseError(format!("Failed to parse exec output: {}", e))
})?;
if let Some(status) = cred.status {
if let Some(token) = status.token {
return Ok(KubeAuth::Token(token));
}
if let (Some(cert_data), Some(key_data)) =
(status.client_certificate_data, status.client_key_data)
{
let cert = BASE64.decode(&cert_data)?;
let key = BASE64.decode(&key_data)?;
return Ok(KubeAuth::ClientCert { cert, key });
}
}
Err(KubeconfigError::ExecParseError(
"Exec credential response missing token or certificate".to_string(),
))
}
}
fn expand_path(path: &str) -> PathBuf {
if path.starts_with("~/") {
let home = std::env::var("HOME").unwrap_or_else(|_| "/root".to_string());
PathBuf::from(home).join(&path[2..])
} else {
PathBuf::from(path)
}
}
#[cfg(test)]
mod tests {
use super::*;
const SAMPLE_KUBECONFIG: &str = r#"
apiVersion: v1
kind: Config
current-context: docker-desktop
clusters:
- cluster:
certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCVENDQWUyZ0F3SUJBZ0lSQU5oV1hCWTBNREMKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQ==
server: https://kubernetes.docker.internal:6443
name: docker-desktop
contexts:
- context:
cluster: docker-desktop
user: docker-desktop
namespace: default
name: docker-desktop
users:
- name: docker-desktop
user:
client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCVENDQWUyZ0F3SUJBZ0lSQU5oV1hCWTBNREMKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQ==
client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBejZ3ZWlPZkp6NW8KLS0tLS1FTkQgUlNBIFBSSVZBVEUgS0VZLS0tLS0=
"#;
const TOKEN_KUBECONFIG: &str = r#"
apiVersion: v1
kind: Config
current-context: my-cluster
clusters:
- cluster:
server: https://api.my-cluster.example.com:6443
insecure-skip-tls-verify: true
name: my-cluster
contexts:
- context:
cluster: my-cluster
user: my-user
name: my-cluster
users:
- name: my-user
user:
token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test-token
"#;
#[test]
fn test_parse_kubeconfig() {
let config = Kubeconfig::from_str(SAMPLE_KUBECONFIG).unwrap();
assert_eq!(config.current_context, Some("docker-desktop".to_string()));
assert_eq!(config.clusters.len(), 1);
assert_eq!(config.contexts.len(), 1);
assert_eq!(config.users.len(), 1);
}
#[test]
fn test_get_context() {
let config = Kubeconfig::from_str(SAMPLE_KUBECONFIG).unwrap();
let context = config.get_context("docker-desktop").unwrap();
assert_eq!(context.cluster, "docker-desktop");
assert_eq!(context.user, "docker-desktop");
assert_eq!(context.namespace, Some("default".to_string()));
}
#[test]
fn test_get_cluster() {
let config = Kubeconfig::from_str(SAMPLE_KUBECONFIG).unwrap();
let cluster = config.get_cluster("docker-desktop").unwrap();
assert_eq!(cluster.server, "https://kubernetes.docker.internal:6443");
assert!(cluster.certificate_authority_data.is_some());
}
#[test]
fn test_context_not_found() {
let config = Kubeconfig::from_str(SAMPLE_KUBECONFIG).unwrap();
let result = config.get_context("nonexistent");
assert!(matches!(result, Err(KubeconfigError::ContextNotFound(_))));
}
#[test]
fn test_resolve_current_context() {
let config = Kubeconfig::from_str(SAMPLE_KUBECONFIG).unwrap();
let resolved = config.resolve_current().unwrap();
assert_eq!(resolved.server, "https://kubernetes.docker.internal:6443");
assert!(resolved.ca_cert.is_some());
assert!(matches!(resolved.auth, KubeAuth::ClientCert { .. }));
assert_eq!(resolved.namespace, Some("default".to_string()));
}
#[test]
fn test_token_auth() {
let config = Kubeconfig::from_str(TOKEN_KUBECONFIG).unwrap();
let resolved = config.resolve_current().unwrap();
assert_eq!(resolved.server, "https://api.my-cluster.example.com:6443");
assert!(resolved.insecure_skip_tls_verify);
match resolved.auth {
KubeAuth::Token(token) => {
assert!(token.starts_with("eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9"));
}
_ => panic!("Expected token auth"),
}
}
#[test]
fn test_expand_path() {
std::env::set_var("HOME", "/home/test");
assert_eq!(
expand_path("~/.kube/config"),
PathBuf::from("/home/test/.kube/config")
);
assert_eq!(
expand_path("/etc/kubernetes/config"),
PathBuf::from("/etc/kubernetes/config")
);
}
#[test]
fn test_no_current_context() {
let config_str = r#"
apiVersion: v1
kind: Config
clusters: []
contexts: []
users: []
"#;
let config = Kubeconfig::from_str(config_str).unwrap();
let result = config.current_context();
assert!(matches!(result, Err(KubeconfigError::NoCurrentContext)));
}
}