use futures::future::BoxFuture;
use http::{
HeaderValue, Request,
header::{AUTHORIZATION, InvalidHeaderValue},
};
use jiff::{SignedDuration, Timestamp};
use jsonpath_rust::JsonPath;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
process::Command,
sync::Arc,
};
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use tower::{BoxError, filter::AsyncPredicate};
use crate::config::{AuthInfo, AuthProviderConfig, ExecAuthCluster, ExecConfig, ExecInteractiveMode};
#[cfg(feature = "oauth")] mod oauth;
#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError;
#[cfg(feature = "oidc")] mod oidc;
#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors;
#[cfg(target_os = "windows")] use std::os::windows::process::CommandExt;
#[derive(Error, Debug)]
pub enum Error {
#[error("invalid basic auth: {0}")]
InvalidBasicAuth(#[source] InvalidHeaderValue),
#[error("invalid bearer token: {0}")]
InvalidBearerToken(#[source] InvalidHeaderValue),
#[error("tried to refresh a token and got a non-refreshable token response")]
UnrefreshableTokenResponse,
#[error("exec-plugin response did not contain a status")]
ExecPluginFailed,
#[error("malformed token expiration date: {0}")]
MalformedTokenExpirationDate(#[source] jiff::Error),
#[error("unable to run auth exec: {0}")]
AuthExecStart(#[source] std::io::Error),
#[error("auth exec command '{cmd}' failed with status {status}: {out:?}")]
AuthExecRun {
cmd: String,
status: std::process::ExitStatus,
out: std::process::Output,
},
#[error("failed to parse auth exec output: {0}")]
AuthExecParse(#[source] serde_json::Error),
#[error("failed to serialize input: {0}")]
AuthExecSerialize(#[source] serde_json::Error),
#[error("failed exec auth: {0}")]
AuthExec(String),
#[error("failed to read token file '{1:?}': {0}")]
ReadTokenFile(#[source] std::io::Error, PathBuf),
#[error("failed to parse token-key")]
ParseTokenKey(#[source] serde_json::Error),
#[error("command must be specified to use exec authentication plugin")]
MissingCommand,
#[cfg(feature = "oauth")]
#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
#[error("failed OAuth: {0}")]
OAuth(#[source] OAuthError),
#[cfg(feature = "oidc")]
#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
#[error("failed OIDC: {0}")]
Oidc(#[source] oidc_errors::Error),
#[error("Cluster spec must be populated when `provideClusterInfo` is true")]
ExecMissingClusterInfo,
#[error("No valid native root CA certificates found")]
NoValidNativeRootCA(#[source] std::io::Error),
}
#[derive(Debug, Clone)]
pub(crate) enum Auth {
None,
Basic(String, SecretString),
Bearer(SecretString),
RefreshableToken(RefreshableToken),
Certificate(String, SecretString, Option<Timestamp>),
}
#[derive(Debug)]
pub struct TokenFile {
path: PathBuf,
token: SecretString,
expires_at: Timestamp,
}
impl TokenFile {
fn new<P: AsRef<Path>>(path: P) -> Result<TokenFile, Error> {
let token = std::fs::read_to_string(&path)
.map_err(|source| Error::ReadTokenFile(source, path.as_ref().to_owned()))?;
Ok(Self {
path: path.as_ref().to_owned(),
token: SecretString::from(token),
expires_at: Timestamp::now() + SIXTY_SEC,
})
}
fn is_expiring(&self) -> bool {
Timestamp::now() + TEN_SEC > self.expires_at
}
fn cached_token(&self) -> Option<&str> {
(!self.is_expiring()).then(|| self.token.expose_secret())
}
fn token(&mut self) -> &str {
if self.is_expiring() {
if let Ok(token) = std::fs::read_to_string(&self.path) {
self.token = SecretString::from(token);
}
self.expires_at = Timestamp::now() + SIXTY_SEC;
}
self.token.expose_secret()
}
}
pub const TEN_SEC: SignedDuration = SignedDuration::from_secs(10);
const SIXTY_SEC: SignedDuration = SignedDuration::from_secs(60);
#[derive(Debug, Clone)]
pub enum RefreshableToken {
Exec(Arc<Mutex<(SecretString, Timestamp, AuthInfo)>>),
File(Arc<RwLock<TokenFile>>),
#[cfg(feature = "oauth")]
GcpOauth(Arc<Mutex<oauth::Gcp>>),
#[cfg(feature = "oidc")]
Oidc(Arc<Mutex<oidc::Oidc>>),
}
impl<B> AsyncPredicate<Request<B>> for RefreshableToken
where
B: http_body::Body + Send + 'static,
{
type Future = BoxFuture<'static, Result<Request<B>, BoxError>>;
type Request = Request<B>;
fn check(&mut self, mut request: Self::Request) -> Self::Future {
let refreshable = self.clone();
Box::pin(async move {
refreshable.to_header().await.map_err(Into::into).map(|value| {
request.headers_mut().insert(AUTHORIZATION, value);
request
})
})
}
}
impl RefreshableToken {
async fn to_header(&self) -> Result<HeaderValue, Error> {
match self {
RefreshableToken::Exec(data) => {
let mut locked_data = data.lock().await;
if Timestamp::now() + SIXTY_SEC >= locked_data.1 {
let auth_info = locked_data.2.clone();
let auth = tokio::task::spawn_blocking(move || Auth::try_from(&auth_info))
.await
.map_err(|e| Error::AuthExec(format!("failed to spawn blocking auth task: {e}")))??;
match auth {
Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) | Auth::Certificate(_, _, _) => {
return Err(Error::UnrefreshableTokenResponse);
}
Auth::RefreshableToken(RefreshableToken::Exec(d)) => {
let (new_token, new_expire, new_info) = Arc::try_unwrap(d)
.expect("Unable to unwrap Arc, this is likely a programming error")
.into_inner();
locked_data.0 = new_token;
locked_data.1 = new_expire;
locked_data.2 = new_info;
}
Auth::RefreshableToken(RefreshableToken::File(_)) => unreachable!(),
#[cfg(feature = "oauth")]
Auth::RefreshableToken(RefreshableToken::GcpOauth(_)) => unreachable!(),
#[cfg(feature = "oidc")]
Auth::RefreshableToken(RefreshableToken::Oidc(_)) => unreachable!(),
}
}
bearer_header(locked_data.0.expose_secret())
}
RefreshableToken::File(token_file) => {
let guard = token_file.read().await;
if let Some(header) = guard.cached_token().map(bearer_header) {
return header;
}
drop(guard);
bearer_header(token_file.write().await.token())
}
#[cfg(feature = "oauth")]
RefreshableToken::GcpOauth(data) => {
let gcp_oauth = data.lock().await;
let token = (*gcp_oauth).token().await.map_err(Error::OAuth)?;
bearer_header(&token.access_token)
}
#[cfg(feature = "oidc")]
RefreshableToken::Oidc(oidc) => {
let token = oidc.lock().await.id_token().await.map_err(Error::Oidc)?;
bearer_header(&token)
}
}
}
}
fn bearer_header(token: &str) -> Result<HeaderValue, Error> {
let mut value = HeaderValue::try_from(format!("Bearer {token}")).map_err(Error::InvalidBearerToken)?;
value.set_sensitive(true);
Ok(value)
}
impl TryFrom<&AuthInfo> for Auth {
type Error = Error;
fn try_from(auth_info: &AuthInfo) -> Result<Self, Self::Error> {
if let Some(provider) = &auth_info.auth_provider {
match token_from_provider(provider)? {
#[cfg(feature = "oidc")]
ProviderToken::Oidc(oidc) => {
return Ok(Self::RefreshableToken(RefreshableToken::Oidc(Arc::new(
Mutex::new(oidc),
))));
}
#[cfg(not(feature = "oidc"))]
ProviderToken::Oidc(token) => {
return Ok(Self::Bearer(SecretString::from(token)));
}
ProviderToken::GcpCommand(token, Some(expiry)) => {
let mut info = auth_info.clone();
let mut provider = provider.clone();
provider.config.insert("access-token".into(), token.clone());
provider.config.insert("expiry".into(), expiry.to_string());
info.auth_provider = Some(provider);
return Ok(Self::RefreshableToken(RefreshableToken::Exec(Arc::new(
Mutex::new((SecretString::from(token), expiry, info)),
))));
}
ProviderToken::GcpCommand(token, None) => {
return Ok(Self::Bearer(SecretString::from(token)));
}
#[cfg(feature = "oauth")]
ProviderToken::GcpOauth(gcp) => {
return Ok(Self::RefreshableToken(RefreshableToken::GcpOauth(Arc::new(
Mutex::new(gcp),
))));
}
}
}
if let (Some(u), Some(p)) = (&auth_info.username, &auth_info.password) {
return Ok(Self::Basic(u.to_owned(), p.to_owned()));
}
if let Some(token) = &auth_info.token {
return Ok(Self::Bearer(token.clone()));
}
if let Some(file) = &auth_info.token_file {
return Ok(Self::RefreshableToken(RefreshableToken::File(Arc::new(
RwLock::new(TokenFile::new(file)?),
))));
}
if let Some(exec) = &auth_info.exec {
let creds = auth_exec(exec)?;
let status = creds.status.ok_or(Error::ExecPluginFailed)?;
let expiration = status
.expiration_timestamp
.map(|ts| ts.parse())
.transpose()
.map_err(Error::MalformedTokenExpirationDate)?;
if let (Some(client_certificate_data), Some(client_key_data)) =
(status.client_certificate_data, status.client_key_data)
{
return Ok(Self::Certificate(
client_certificate_data,
client_key_data.into(),
expiration,
));
}
match (status.token.map(SecretString::from), expiration) {
(Some(token), Some(expire)) => Ok(Self::RefreshableToken(RefreshableToken::Exec(Arc::new(
Mutex::new((token, expire, auth_info.clone())),
)))),
(Some(token), None) => Ok(Self::Bearer(token)),
_ => Ok(Self::None),
}
} else {
Ok(Self::None)
}
}
}
enum ProviderToken {
#[cfg(feature = "oidc")]
Oidc(oidc::Oidc),
#[cfg(not(feature = "oidc"))]
Oidc(String),
GcpCommand(String, Option<Timestamp>),
#[cfg(feature = "oauth")]
GcpOauth(oauth::Gcp),
}
fn token_from_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
match provider.name.as_ref() {
"oidc" => token_from_oidc_provider(provider),
"gcp" => token_from_gcp_provider(provider),
"azure" => Err(Error::AuthExec(
"The azure auth plugin is not supported; use https://github.com/Azure/kubelogin instead".into(),
)),
_ => Err(Error::AuthExec(format!(
"Authentication with provider {:} not supported",
provider.name
))),
}
}
#[cfg(feature = "oidc")]
fn token_from_oidc_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
oidc::Oidc::from_config(&provider.config)
.map_err(Error::Oidc)
.map(ProviderToken::Oidc)
}
#[cfg(not(feature = "oidc"))]
fn token_from_oidc_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
match provider.config.get("id-token") {
Some(id_token) => Ok(ProviderToken::Oidc(id_token.clone())),
None => Err(Error::AuthExec(
"No id-token for oidc Authentication provider".into(),
)),
}
}
fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
if let Some(id_token) = provider.config.get("id-token") {
return Ok(ProviderToken::GcpCommand(id_token.clone(), None));
}
if let Some(access_token) = provider.config.get("access-token")
&& let Some(expiry) = provider.config.get("expiry")
{
let expiry_date = expiry
.parse::<Timestamp>()
.map_err(Error::MalformedTokenExpirationDate)?;
if Timestamp::now() + SIXTY_SEC < expiry_date {
return Ok(ProviderToken::GcpCommand(access_token.clone(), Some(expiry_date)));
}
}
if let Some(cmd) = provider.config.get("cmd-path") {
let params = provider.config.get("cmd-args").cloned().unwrap_or_default();
let drop_env = provider.config.get("cmd-drop-env").cloned().unwrap_or_default();
let mut command = Command::new(cmd);
for env in drop_env.trim().split(' ') {
command.env_remove(env);
}
let output = command
.args(params.trim().split(' '))
.output()
.map_err(|e| Error::AuthExec(format!("Executing {cmd:} failed: {e:?}")))?;
if !output.status.success() {
return Err(Error::AuthExecRun {
cmd: format!("{cmd} {params}"),
status: output.status,
out: output,
});
}
if let Some(field) = provider.config.get("token-key") {
let json_output: serde_json::Value =
serde_json::from_slice(&output.stdout).map_err(Error::ParseTokenKey)?;
let token = extract_value(&json_output, "token-key", field)?;
if let Some(field) = provider.config.get("expiry-key") {
let expiry = extract_value(&json_output, "expiry-key", field)?;
let expiry = expiry
.parse::<Timestamp>()
.map_err(Error::MalformedTokenExpirationDate)?;
return Ok(ProviderToken::GcpCommand(token, Some(expiry)));
} else {
return Ok(ProviderToken::GcpCommand(token, None));
}
} else {
let token = std::str::from_utf8(&output.stdout)
.map_err(|e| Error::AuthExec(format!("Result is not a string {e:?} ")))?
.to_owned();
return Ok(ProviderToken::GcpCommand(token, None));
}
}
#[cfg(feature = "oauth")]
{
Ok(ProviderToken::GcpOauth(
oauth::Gcp::default_credentials_with_scopes(provider.config.get("scopes"))
.map_err(Error::OAuth)?,
))
}
#[cfg(not(feature = "oauth"))]
{
Err(Error::AuthExec(
"Enable oauth feature to use Google Application Credentials-based token source".into(),
))
}
}
fn extract_value(json: &serde_json::Value, context: &str, path: &str) -> Result<String, Error> {
let path = {
let p = path.trim_matches(|c| c == '"' || c == '{' || c == '}');
if p.starts_with('$') {
p
} else if p.starts_with('.') {
&format!("${p}")
} else {
&format!("$.{p}")
}
};
let res = json.query(path).map_err(|err| {
Error::AuthExec(format!(
"Failed to query {context:?} as a JsonPath: {path}\n
Error: {err}"
))
})?;
let Some(jval) = res.into_iter().next() else {
return Err(Error::AuthExec(format!(
"Target {context:?} value {path:?} not found"
)));
};
let val = jval.as_str().ok_or(Error::AuthExec(format!(
"Target {context:?} value {path:?} is not a string"
)))?;
Ok(val.to_string())
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecCredential {
pub kind: Option<String>,
#[serde(rename = "apiVersion")]
pub api_version: Option<String>,
pub spec: Option<ExecCredentialSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<ExecCredentialStatus>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecCredentialSpec {
#[serde(skip_serializing_if = "Option::is_none")]
interactive: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
cluster: Option<ExecAuthCluster>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecCredentialStatus {
#[serde(rename = "expirationTimestamp")]
pub expiration_timestamp: Option<String>,
pub token: Option<String>,
#[serde(rename = "clientCertificateData")]
pub client_certificate_data: Option<String>,
#[serde(rename = "clientKeyData")]
pub client_key_data: Option<String>,
}
fn auth_exec(auth: &ExecConfig) -> Result<ExecCredential, Error> {
let mut cmd = match &auth.command {
Some(cmd) => Command::new(cmd),
None => return Err(Error::MissingCommand),
};
if let Some(args) = &auth.args {
cmd.args(args);
}
if let Some(env) = &auth.env {
let envs = env
.iter()
.flat_map(|env| match (env.get("name"), env.get("value")) {
(Some(name), Some(value)) => Some((name, value)),
_ => None,
});
cmd.envs(envs);
}
let interactive = auth.interactive_mode != Some(ExecInteractiveMode::Never);
if interactive {
cmd.stdin(std::process::Stdio::inherit());
cmd.stderr(std::process::Stdio::inherit());
} else {
cmd.stdin(std::process::Stdio::piped());
}
let mut exec_credential_spec = ExecCredentialSpec {
interactive: Some(interactive),
cluster: None,
};
if auth.provide_cluster_info {
exec_credential_spec.cluster = Some(auth.cluster.clone().ok_or(Error::ExecMissingClusterInfo)?);
}
let exec_info = serde_json::to_string(&ExecCredential {
api_version: auth.api_version.clone(),
kind: "ExecCredential".to_string().into(),
spec: Some(exec_credential_spec),
status: None,
})
.map_err(Error::AuthExecSerialize)?;
cmd.env("KUBERNETES_EXEC_INFO", exec_info);
if let Some(envs) = &auth.drop_env {
for env in envs {
cmd.env_remove(env);
}
}
#[cfg(target_os = "windows")]
{
const CREATE_NO_WINDOW: u32 = 0x08000000;
cmd.creation_flags(CREATE_NO_WINDOW);
}
let out = cmd.output().map_err(Error::AuthExecStart)?;
if !out.status.success() {
return Err(Error::AuthExecRun {
cmd: format!("{cmd:?}"),
status: out.status,
out,
});
}
let creds = serde_json::from_slice(&out.stdout).map_err(Error::AuthExecParse)?;
Ok(creds)
}
#[cfg(test)]
mod test {
use crate::config::Kubeconfig;
use std::time::{Duration, Instant};
use super::*;
fn gcp_auth_info(cmd_path: &str, cmd_args: &str) -> AuthInfo {
let test_file = format!(
r#"
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: XXXXXXX
server: https://36.XXX.XXX.XX
name: generic-name
contexts:
- context:
cluster: generic-name
user: generic-name
name: generic-name
current-context: generic-name
kind: Config
preferences: {{}}
users:
- name: generic-name
user:
auth-provider:
config:
cmd-args: {cmd_args}
cmd-path: {cmd_path}
expiry-key: '{{.credential.token_expiry}}'
token-key: '{{.credential.access_token}}'
name: gcp
"#
);
let config: Kubeconfig = serde_yaml::from_str(&test_file).unwrap();
config.auth_infos[0].auth_info.clone().unwrap()
}
fn cred_json(token: &str, expiry: &str) -> String {
format!(
r#"{{"something": "else", "credential": {{"access_token": "{token}", "token_expiry": "{expiry}"}}}}"#
)
}
#[tokio::test]
#[ignore = "fails on windows mysteriously"]
async fn exec_auth_command() -> Result<(), Error> {
let expiry = (Timestamp::now() + SIXTY_SEC).to_string();
let auth_info = gcp_auth_info("echo", &format!("'{}'", cred_json("my_token", &expiry)));
match Auth::try_from(&auth_info).unwrap() {
Auth::RefreshableToken(RefreshableToken::Exec(refreshable)) => {
let (token, _expire, info) = Arc::try_unwrap(refreshable).unwrap().into_inner();
assert_eq!(token.expose_secret(), &"my_token".to_owned());
let config = info.auth_provider.unwrap().config;
assert_eq!(config.get("access-token"), Some(&"my_token".to_owned()));
}
_ => unreachable!(),
}
Ok(())
}
#[tokio::test]
#[ignore = "shells out to echo/sh; skipped on windows"]
async fn exec_token_refresh_via_to_header() -> Result<(), Error> {
let fresh_expiry = (Timestamp::now() + SignedDuration::from_secs(3600)).to_string();
let auth_info = gcp_auth_info("echo", &format!("'{}'", cred_json("my_token", &fresh_expiry)));
let stale_expiry = Timestamp::now() - SIXTY_SEC;
let refreshable = RefreshableToken::Exec(Arc::new(Mutex::new((
SecretString::from("stale"),
stale_expiry,
auth_info,
))));
let header = refreshable.to_header().await?;
assert_eq!(header, HeaderValue::from_static("Bearer my_token"));
if let RefreshableToken::Exec(data) = &refreshable {
let locked = data.lock().await;
assert_eq!(locked.0.expose_secret(), "my_token");
assert!(locked.1 > Timestamp::now(), "expiry should be in the future");
} else {
unreachable!();
}
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[ignore = "shells out to echo/sh; skipped on windows"]
async fn exec_token_refresh_does_not_block_runtime() -> Result<(), Error> {
use std::io::Write;
let fresh_expiry = (Timestamp::now() + SignedDuration::from_secs(3600)).to_string();
let mut script = tempfile::NamedTempFile::new().unwrap();
writeln!(script, "#!/bin/sh").unwrap();
writeln!(script, "sleep 0.3").unwrap();
writeln!(script, "echo '{}'", cred_json("my_token", &fresh_expiry)).unwrap();
script.flush().unwrap();
let script_path = script.path().to_str().unwrap().to_owned();
let auth_info = gcp_auth_info("sh", &script_path);
let stale_expiry = Timestamp::now() - SIXTY_SEC;
let refreshable = RefreshableToken::Exec(Arc::new(Mutex::new((
SecretString::from("stale"),
stale_expiry,
auth_info,
))));
let start = Instant::now();
let (header, timer_elapsed) = tokio::join!(refreshable.to_header(), async {
tokio::time::sleep(Duration::from_millis(50)).await;
start.elapsed()
});
assert!(
timer_elapsed < Duration::from_millis(200),
"timer took {timer_elapsed:?}; to_header likely blocked the runtime"
);
assert_eq!(header?, HeaderValue::from_static("Bearer my_token"));
Ok(())
}
#[test]
fn token_file() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(file.path(), "token1").unwrap();
let mut token_file = TokenFile::new(file.path()).unwrap();
assert_eq!(token_file.cached_token().unwrap(), "token1");
assert!(!token_file.is_expiring());
assert_eq!(token_file.token(), "token1");
std::fs::write(file.path(), "token2").unwrap();
assert_eq!(token_file.token(), "token1");
token_file.expires_at = Timestamp::now();
assert!(token_file.is_expiring());
assert_eq!(token_file.cached_token(), None);
assert_eq!(token_file.token(), "token2");
assert!(!token_file.is_expiring());
assert_eq!(token_file.cached_token().unwrap(), "token2");
}
}