Skip to main content

kube_client/client/auth/
mod.rs

1use futures::future::BoxFuture;
2use http::{
3    HeaderValue, Request,
4    header::{AUTHORIZATION, InvalidHeaderValue},
5};
6use jiff::{SignedDuration, Timestamp};
7use jsonpath_rust::JsonPath;
8use secrecy::{ExposeSecret, SecretString};
9use serde::{Deserialize, Serialize};
10use std::{
11    path::{Path, PathBuf},
12    process::Command,
13    sync::Arc,
14};
15use thiserror::Error;
16use tokio::sync::{Mutex, RwLock};
17use tower::{BoxError, filter::AsyncPredicate};
18
19use crate::config::{AuthInfo, AuthProviderConfig, ExecAuthCluster, ExecConfig, ExecInteractiveMode};
20
21#[cfg(feature = "oauth")] mod oauth;
22#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError;
23#[cfg(feature = "oidc")] mod oidc;
24#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors;
25#[cfg(target_os = "windows")] use std::os::windows::process::CommandExt;
26
27#[derive(Error, Debug)]
28/// Client auth errors
29pub enum Error {
30    /// Invalid basic auth
31    #[error("invalid basic auth: {0}")]
32    InvalidBasicAuth(#[source] InvalidHeaderValue),
33
34    /// Invalid bearer token
35    #[error("invalid bearer token: {0}")]
36    InvalidBearerToken(#[source] InvalidHeaderValue),
37
38    /// Tried to refresh a token and got a non-refreshable token response
39    #[error("tried to refresh a token and got a non-refreshable token response")]
40    UnrefreshableTokenResponse,
41
42    /// Exec plugin response did not contain a status
43    #[error("exec-plugin response did not contain a status")]
44    ExecPluginFailed,
45
46    /// Malformed token expiration date
47    #[error("malformed token expiration date: {0}")]
48    MalformedTokenExpirationDate(#[source] jiff::Error),
49
50    /// Failed to start auth exec
51    #[error("unable to run auth exec: {0}")]
52    AuthExecStart(#[source] std::io::Error),
53
54    /// Failed to run auth exec command
55    #[error("auth exec command '{cmd}' failed with status {status}: {out:?}")]
56    AuthExecRun {
57        /// The failed command
58        cmd: String,
59        /// The exit status or exit code of the failed command
60        status: std::process::ExitStatus,
61        /// Stdout/Stderr of the failed command
62        out: std::process::Output,
63    },
64
65    /// Failed to parse auth exec output
66    #[error("failed to parse auth exec output: {0}")]
67    AuthExecParse(#[source] serde_json::Error),
68
69    /// Fail to serialize input
70    #[error("failed to serialize input: {0}")]
71    AuthExecSerialize(#[source] serde_json::Error),
72
73    /// Failed to exec auth
74    #[error("failed exec auth: {0}")]
75    AuthExec(String),
76
77    /// Failed to read token file
78    #[error("failed to read token file '{1:?}': {0}")]
79    ReadTokenFile(#[source] std::io::Error, PathBuf),
80
81    /// Failed to parse token-key
82    #[error("failed to parse token-key")]
83    ParseTokenKey(#[source] serde_json::Error),
84
85    /// command was missing from exec config
86    #[error("command must be specified to use exec authentication plugin")]
87    MissingCommand,
88
89    /// OAuth error
90    #[cfg(feature = "oauth")]
91    #[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
92    #[error("failed OAuth: {0}")]
93    OAuth(#[source] OAuthError),
94
95    /// OIDC error
96    #[cfg(feature = "oidc")]
97    #[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
98    #[error("failed OIDC: {0}")]
99    Oidc(#[source] oidc_errors::Error),
100
101    /// cluster spec missing while `provideClusterInfo` is true
102    #[error("Cluster spec must be populated when `provideClusterInfo` is true")]
103    ExecMissingClusterInfo,
104
105    /// No valid native root CA certificates found
106    #[error("No valid native root CA certificates found")]
107    NoValidNativeRootCA(#[source] std::io::Error),
108}
109
110#[derive(Debug, Clone)]
111pub(crate) enum Auth {
112    None,
113    Basic(String, SecretString),
114    Bearer(SecretString),
115    RefreshableToken(RefreshableToken),
116    Certificate(String, SecretString, Option<Timestamp>),
117}
118
119// Token file reference. Reloads at least once per minute.
120#[derive(Debug)]
121pub struct TokenFile {
122    path: PathBuf,
123    token: SecretString,
124    expires_at: Timestamp,
125}
126
127impl TokenFile {
128    fn new<P: AsRef<Path>>(path: P) -> Result<TokenFile, Error> {
129        let token = std::fs::read_to_string(&path)
130            .map_err(|source| Error::ReadTokenFile(source, path.as_ref().to_owned()))?;
131        Ok(Self {
132            path: path.as_ref().to_owned(),
133            token: SecretString::from(token),
134            // Try to reload at least once a minute
135            expires_at: Timestamp::now() + SIXTY_SEC,
136        })
137    }
138
139    fn is_expiring(&self) -> bool {
140        Timestamp::now() + TEN_SEC > self.expires_at
141    }
142
143    /// Get the cached token. Returns `None` if it's expiring.
144    fn cached_token(&self) -> Option<&str> {
145        (!self.is_expiring()).then(|| self.token.expose_secret())
146    }
147
148    /// Get a token. Reloads from file if the cached token is expiring.
149    fn token(&mut self) -> &str {
150        if self.is_expiring() {
151            // > If reload from file fails, the last-read token should be used to avoid breaking
152            // > clients that make token files available on process start and then remove them to
153            // > limit credential exposure.
154            // > https://github.com/kubernetes/kubernetes/issues/68164
155            if let Ok(token) = std::fs::read_to_string(&self.path) {
156                self.token = SecretString::from(token);
157            }
158            self.expires_at = Timestamp::now() + SIXTY_SEC;
159        }
160        self.token.expose_secret()
161    }
162}
163
164/// Common constant for checking if an auth token is close to expiring
165pub const TEN_SEC: SignedDuration = SignedDuration::from_secs(10);
166/// Common duration for time between reloads
167const SIXTY_SEC: SignedDuration = SignedDuration::from_secs(60);
168
169// See https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/client-go/plugin/pkg/client/auth
170// for the list of auth-plugins supported by client-go.
171// We currently support the following:
172// - exec
173// - token-file refreshed at least once per minute
174// - gcp: command based token source (exec)
175// - gcp: application credential based token source (requires `oauth` feature)
176//
177// Note that the visibility must be `pub` for `impl Layer for AuthLayer`, but this is not exported from the crate.
178// It's not accessible from outside and not shown on docs.
179#[derive(Debug, Clone)]
180pub enum RefreshableToken {
181    Exec(Arc<Mutex<(SecretString, Timestamp, AuthInfo)>>),
182    File(Arc<RwLock<TokenFile>>),
183    #[cfg(feature = "oauth")]
184    GcpOauth(Arc<Mutex<oauth::Gcp>>),
185    #[cfg(feature = "oidc")]
186    Oidc(Arc<Mutex<oidc::Oidc>>),
187}
188
189// For use with `AsyncFilterLayer` to add `Authorization` header with a refreshed token.
190impl<B> AsyncPredicate<Request<B>> for RefreshableToken
191where
192    B: http_body::Body + Send + 'static,
193{
194    type Future = BoxFuture<'static, Result<Request<B>, BoxError>>;
195    type Request = Request<B>;
196
197    fn check(&mut self, mut request: Self::Request) -> Self::Future {
198        let refreshable = self.clone();
199        Box::pin(async move {
200            refreshable.to_header().await.map_err(Into::into).map(|value| {
201                request.headers_mut().insert(AUTHORIZATION, value);
202                request
203            })
204        })
205    }
206}
207
208impl RefreshableToken {
209    async fn to_header(&self) -> Result<HeaderValue, Error> {
210        match self {
211            RefreshableToken::Exec(data) => {
212                let mut locked_data = data.lock().await;
213                // Add some wiggle room onto the current timestamp so we don't get any race
214                // conditions where the token expires while we are refreshing
215                if Timestamp::now() + SIXTY_SEC >= locked_data.1 {
216                    // Run blocking exec command on the blocking threadpool to avoid
217                    // stalling the tokio worker during token refresh.
218                    // TODO Improve refreshing exec to avoid `Auth::try_from`
219                    let auth_info = locked_data.2.clone();
220                    let auth = tokio::task::spawn_blocking(move || Auth::try_from(&auth_info))
221                        .await
222                        .map_err(|e| Error::AuthExec(format!("failed to spawn blocking auth task: {e}")))??;
223                    match auth {
224                        Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) | Auth::Certificate(_, _, _) => {
225                            return Err(Error::UnrefreshableTokenResponse);
226                        }
227
228                        Auth::RefreshableToken(RefreshableToken::Exec(d)) => {
229                            let (new_token, new_expire, new_info) = Arc::try_unwrap(d)
230                                .expect("Unable to unwrap Arc, this is likely a programming error")
231                                .into_inner();
232                            locked_data.0 = new_token;
233                            locked_data.1 = new_expire;
234                            locked_data.2 = new_info;
235                        }
236
237                        // Unreachable because the token source does not change
238                        Auth::RefreshableToken(RefreshableToken::File(_)) => unreachable!(),
239                        #[cfg(feature = "oauth")]
240                        Auth::RefreshableToken(RefreshableToken::GcpOauth(_)) => unreachable!(),
241                        #[cfg(feature = "oidc")]
242                        Auth::RefreshableToken(RefreshableToken::Oidc(_)) => unreachable!(),
243                    }
244                }
245
246                bearer_header(locked_data.0.expose_secret())
247            }
248
249            RefreshableToken::File(token_file) => {
250                let guard = token_file.read().await;
251                if let Some(header) = guard.cached_token().map(bearer_header) {
252                    return header;
253                }
254                // Drop the read guard before a write lock attempt to prevent deadlock.
255                drop(guard);
256                // Note that `token()` only reloads if the cached token is expiring.
257                // A separate method to conditionally reload minimizes the need for an exclusive access.
258                bearer_header(token_file.write().await.token())
259            }
260
261            #[cfg(feature = "oauth")]
262            RefreshableToken::GcpOauth(data) => {
263                let gcp_oauth = data.lock().await;
264                let token = (*gcp_oauth).token().await.map_err(Error::OAuth)?;
265                bearer_header(&token.access_token)
266            }
267
268            #[cfg(feature = "oidc")]
269            RefreshableToken::Oidc(oidc) => {
270                let token = oidc.lock().await.id_token().await.map_err(Error::Oidc)?;
271                bearer_header(&token)
272            }
273        }
274    }
275}
276
277fn bearer_header(token: &str) -> Result<HeaderValue, Error> {
278    let mut value = HeaderValue::try_from(format!("Bearer {token}")).map_err(Error::InvalidBearerToken)?;
279    value.set_sensitive(true);
280    Ok(value)
281}
282
283impl TryFrom<&AuthInfo> for Auth {
284    type Error = Error;
285
286    /// Loads the authentication header from the credentials available in the kubeconfig. This supports
287    /// exec plugins as well as specified in
288    /// https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins
289    fn try_from(auth_info: &AuthInfo) -> Result<Self, Self::Error> {
290        if let Some(provider) = &auth_info.auth_provider {
291            match token_from_provider(provider)? {
292                #[cfg(feature = "oidc")]
293                ProviderToken::Oidc(oidc) => {
294                    return Ok(Self::RefreshableToken(RefreshableToken::Oidc(Arc::new(
295                        Mutex::new(oidc),
296                    ))));
297                }
298
299                #[cfg(not(feature = "oidc"))]
300                ProviderToken::Oidc(token) => {
301                    return Ok(Self::Bearer(SecretString::from(token)));
302                }
303
304                ProviderToken::GcpCommand(token, Some(expiry)) => {
305                    let mut info = auth_info.clone();
306                    let mut provider = provider.clone();
307                    provider.config.insert("access-token".into(), token.clone());
308                    // `jiff::Timestamp` provides RFC3339 via `Display`, docs: https://docs.rs/jiff/latest/jiff/struct.Timestamp.html#impl-Display-for-Timestamp
309                    provider.config.insert("expiry".into(), expiry.to_string());
310                    info.auth_provider = Some(provider);
311                    return Ok(Self::RefreshableToken(RefreshableToken::Exec(Arc::new(
312                        Mutex::new((SecretString::from(token), expiry, info)),
313                    ))));
314                }
315
316                ProviderToken::GcpCommand(token, None) => {
317                    return Ok(Self::Bearer(SecretString::from(token)));
318                }
319
320                #[cfg(feature = "oauth")]
321                ProviderToken::GcpOauth(gcp) => {
322                    return Ok(Self::RefreshableToken(RefreshableToken::GcpOauth(Arc::new(
323                        Mutex::new(gcp),
324                    ))));
325                }
326            }
327        }
328
329        if let (Some(u), Some(p)) = (&auth_info.username, &auth_info.password) {
330            return Ok(Self::Basic(u.to_owned(), p.to_owned()));
331        }
332
333        // Inline token. Has precedence over `token_file`.
334        if let Some(token) = &auth_info.token {
335            return Ok(Self::Bearer(token.clone()));
336        }
337
338        // Token file reference. Must be reloaded at least once a minute.
339        if let Some(file) = &auth_info.token_file {
340            return Ok(Self::RefreshableToken(RefreshableToken::File(Arc::new(
341                RwLock::new(TokenFile::new(file)?),
342            ))));
343        }
344
345        if let Some(exec) = &auth_info.exec {
346            let creds = auth_exec(exec)?;
347            let status = creds.status.ok_or(Error::ExecPluginFailed)?;
348            let expiration = status
349                .expiration_timestamp
350                .map(|ts| ts.parse())
351                .transpose()
352                .map_err(Error::MalformedTokenExpirationDate)?;
353
354            if let (Some(client_certificate_data), Some(client_key_data)) =
355                (status.client_certificate_data, status.client_key_data)
356            {
357                return Ok(Self::Certificate(
358                    client_certificate_data,
359                    client_key_data.into(),
360                    expiration,
361                ));
362            }
363
364            match (status.token.map(SecretString::from), expiration) {
365                (Some(token), Some(expire)) => Ok(Self::RefreshableToken(RefreshableToken::Exec(Arc::new(
366                    Mutex::new((token, expire, auth_info.clone())),
367                )))),
368                (Some(token), None) => Ok(Self::Bearer(token)),
369                _ => Ok(Self::None),
370            }
371        } else {
372            Ok(Self::None)
373        }
374    }
375}
376
377// We need to differentiate providers because the keys/formats to store token expiration differs.
378enum ProviderToken {
379    #[cfg(feature = "oidc")]
380    Oidc(oidc::Oidc),
381    #[cfg(not(feature = "oidc"))]
382    Oidc(String),
383    // "access-token", "expiry" (RFC3339)
384    GcpCommand(String, Option<Timestamp>),
385    #[cfg(feature = "oauth")]
386    GcpOauth(oauth::Gcp),
387    // "access-token", "expires-on" (timestamp)
388    // Azure(String, Option<DateTime<Utc>>),
389}
390
391fn token_from_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
392    match provider.name.as_ref() {
393        "oidc" => token_from_oidc_provider(provider),
394        "gcp" => token_from_gcp_provider(provider),
395        "azure" => Err(Error::AuthExec(
396            "The azure auth plugin is not supported; use https://github.com/Azure/kubelogin instead".into(),
397        )),
398        _ => Err(Error::AuthExec(format!(
399            "Authentication with provider {:} not supported",
400            provider.name
401        ))),
402    }
403}
404
405#[cfg(feature = "oidc")]
406fn token_from_oidc_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
407    oidc::Oidc::from_config(&provider.config)
408        .map_err(Error::Oidc)
409        .map(ProviderToken::Oidc)
410}
411
412#[cfg(not(feature = "oidc"))]
413fn token_from_oidc_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
414    match provider.config.get("id-token") {
415        Some(id_token) => Ok(ProviderToken::Oidc(id_token.clone())),
416        None => Err(Error::AuthExec(
417            "No id-token for oidc Authentication provider".into(),
418        )),
419    }
420}
421
422fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToken, Error> {
423    if let Some(id_token) = provider.config.get("id-token") {
424        return Ok(ProviderToken::GcpCommand(id_token.clone(), None));
425    }
426
427    // Return cached access token if it's still valid
428    if let Some(access_token) = provider.config.get("access-token")
429        && let Some(expiry) = provider.config.get("expiry")
430    {
431        let expiry_date = expiry
432            .parse::<Timestamp>()
433            .map_err(Error::MalformedTokenExpirationDate)?;
434        if Timestamp::now() + SIXTY_SEC < expiry_date {
435            return Ok(ProviderToken::GcpCommand(access_token.clone(), Some(expiry_date)));
436        }
437    }
438
439    // Command-based token source
440    if let Some(cmd) = provider.config.get("cmd-path") {
441        let params = provider.config.get("cmd-args").cloned().unwrap_or_default();
442        // NB: This property does currently not exist upstream in client-go
443        // See https://github.com/kube-rs/kube/issues/1060
444        let drop_env = provider.config.get("cmd-drop-env").cloned().unwrap_or_default();
445        // TODO splitting args by space is not safe
446        let mut command = Command::new(cmd);
447        // Do not pass the following env vars to the command
448        for env in drop_env.trim().split(' ') {
449            command.env_remove(env);
450        }
451        let output = command
452            .args(params.trim().split(' '))
453            .output()
454            .map_err(|e| Error::AuthExec(format!("Executing {cmd:} failed: {e:?}")))?;
455
456        if !output.status.success() {
457            return Err(Error::AuthExecRun {
458                cmd: format!("{cmd} {params}"),
459                status: output.status,
460                out: output,
461            });
462        }
463
464        if let Some(field) = provider.config.get("token-key") {
465            let json_output: serde_json::Value =
466                serde_json::from_slice(&output.stdout).map_err(Error::ParseTokenKey)?;
467            let token = extract_value(&json_output, "token-key", field)?;
468            if let Some(field) = provider.config.get("expiry-key") {
469                let expiry = extract_value(&json_output, "expiry-key", field)?;
470                let expiry = expiry
471                    .parse::<Timestamp>()
472                    .map_err(Error::MalformedTokenExpirationDate)?;
473                return Ok(ProviderToken::GcpCommand(token, Some(expiry)));
474            } else {
475                return Ok(ProviderToken::GcpCommand(token, None));
476            }
477        } else {
478            let token = std::str::from_utf8(&output.stdout)
479                .map_err(|e| Error::AuthExec(format!("Result is not a string {e:?} ")))?
480                .to_owned();
481            return Ok(ProviderToken::GcpCommand(token, None));
482        }
483    }
484
485    // Google Application Credentials-based token source
486    #[cfg(feature = "oauth")]
487    {
488        Ok(ProviderToken::GcpOauth(
489            oauth::Gcp::default_credentials_with_scopes(provider.config.get("scopes"))
490                .map_err(Error::OAuth)?,
491        ))
492    }
493    #[cfg(not(feature = "oauth"))]
494    {
495        Err(Error::AuthExec(
496            "Enable oauth feature to use Google Application Credentials-based token source".into(),
497        ))
498    }
499}
500
501fn extract_value(json: &serde_json::Value, context: &str, path: &str) -> Result<String, Error> {
502    let path = {
503        let p = path.trim_matches(|c| c == '"' || c == '{' || c == '}');
504        if p.starts_with('$') {
505            p
506        } else if p.starts_with('.') {
507            &format!("${p}")
508        } else {
509            &format!("$.{p}")
510        }
511    };
512
513    let res = json.query(path).map_err(|err| {
514        Error::AuthExec(format!(
515            "Failed to query {context:?} as a JsonPath: {path}\n
516             Error: {err}"
517        ))
518    })?;
519
520    let Some(jval) = res.into_iter().next() else {
521        return Err(Error::AuthExec(format!(
522            "Target {context:?} value {path:?} not found"
523        )));
524    };
525
526    let val = jval.as_str().ok_or(Error::AuthExec(format!(
527        "Target {context:?} value {path:?} is not a string"
528    )))?;
529
530    Ok(val.to_string())
531}
532
533/// ExecCredentials is used by exec-based plugins to communicate credentials to
534/// HTTP transports.
535#[derive(Clone, Debug, Serialize, Deserialize)]
536pub struct ExecCredential {
537    pub kind: Option<String>,
538    #[serde(rename = "apiVersion")]
539    pub api_version: Option<String>,
540    pub spec: Option<ExecCredentialSpec>,
541    #[serde(skip_serializing_if = "Option::is_none")]
542    pub status: Option<ExecCredentialStatus>,
543}
544
545/// ExecCredenitalSpec holds request and runtime specific information provided
546/// by transport.
547#[derive(Clone, Debug, Serialize, Deserialize)]
548pub struct ExecCredentialSpec {
549    #[serde(skip_serializing_if = "Option::is_none")]
550    interactive: Option<bool>,
551
552    #[serde(skip_serializing_if = "Option::is_none")]
553    cluster: Option<ExecAuthCluster>,
554}
555
556/// ExecCredentialStatus holds credentials for the transport to use.
557#[derive(Clone, Debug, Serialize, Deserialize)]
558pub struct ExecCredentialStatus {
559    #[serde(rename = "expirationTimestamp")]
560    pub expiration_timestamp: Option<String>,
561    pub token: Option<String>,
562    #[serde(rename = "clientCertificateData")]
563    pub client_certificate_data: Option<String>,
564    #[serde(rename = "clientKeyData")]
565    pub client_key_data: Option<String>,
566}
567
568fn auth_exec(auth: &ExecConfig) -> Result<ExecCredential, Error> {
569    let mut cmd = match &auth.command {
570        Some(cmd) => Command::new(cmd),
571        None => return Err(Error::MissingCommand),
572    };
573
574    if let Some(args) = &auth.args {
575        cmd.args(args);
576    }
577    if let Some(env) = &auth.env {
578        let envs = env
579            .iter()
580            .flat_map(|env| match (env.get("name"), env.get("value")) {
581                (Some(name), Some(value)) => Some((name, value)),
582                _ => None,
583            });
584        cmd.envs(envs);
585    }
586
587    let interactive = auth.interactive_mode != Some(ExecInteractiveMode::Never);
588    if interactive {
589        cmd.stdin(std::process::Stdio::inherit());
590        cmd.stderr(std::process::Stdio::inherit());
591    } else {
592        cmd.stdin(std::process::Stdio::piped());
593    }
594
595    let mut exec_credential_spec = ExecCredentialSpec {
596        interactive: Some(interactive),
597        cluster: None,
598    };
599
600    if auth.provide_cluster_info {
601        exec_credential_spec.cluster = Some(auth.cluster.clone().ok_or(Error::ExecMissingClusterInfo)?);
602    }
603
604    // Provide exec info to child process
605    let exec_info = serde_json::to_string(&ExecCredential {
606        api_version: auth.api_version.clone(),
607        kind: "ExecCredential".to_string().into(),
608        spec: Some(exec_credential_spec),
609        status: None,
610    })
611    .map_err(Error::AuthExecSerialize)?;
612    cmd.env("KUBERNETES_EXEC_INFO", exec_info);
613
614    if let Some(envs) = &auth.drop_env {
615        for env in envs {
616            cmd.env_remove(env);
617        }
618    }
619
620    #[cfg(target_os = "windows")]
621    {
622        const CREATE_NO_WINDOW: u32 = 0x08000000;
623        cmd.creation_flags(CREATE_NO_WINDOW);
624    }
625
626    let out = cmd.output().map_err(Error::AuthExecStart)?;
627    if !out.status.success() {
628        return Err(Error::AuthExecRun {
629            cmd: format!("{cmd:?}"),
630            status: out.status,
631            out,
632        });
633    }
634    let creds = serde_json::from_slice(&out.stdout).map_err(Error::AuthExecParse)?;
635
636    Ok(creds)
637}
638
639#[cfg(test)]
640mod test {
641    use crate::config::Kubeconfig;
642    use std::time::{Duration, Instant};
643
644    use super::*;
645
646    /// Build an `AuthInfo` whose gcp auth-provider runs `cmd_path cmd_args` to emit credential JSON.
647    fn gcp_auth_info(cmd_path: &str, cmd_args: &str) -> AuthInfo {
648        let test_file = format!(
649            r#"
650        apiVersion: v1
651        clusters:
652        - cluster:
653            certificate-authority-data: XXXXXXX
654            server: https://36.XXX.XXX.XX
655          name: generic-name
656        contexts:
657        - context:
658            cluster: generic-name
659            user: generic-name
660          name: generic-name
661        current-context: generic-name
662        kind: Config
663        preferences: {{}}
664        users:
665        - name: generic-name
666          user:
667            auth-provider:
668              config:
669                cmd-args: {cmd_args}
670                cmd-path: {cmd_path}
671                expiry-key: '{{.credential.token_expiry}}'
672                token-key: '{{.credential.access_token}}'
673              name: gcp
674        "#
675        );
676        let config: Kubeconfig = serde_yaml::from_str(&test_file).unwrap();
677        config.auth_infos[0].auth_info.clone().unwrap()
678    }
679
680    fn cred_json(token: &str, expiry: &str) -> String {
681        format!(
682            r#"{{"something": "else", "credential": {{"access_token": "{token}", "token_expiry": "{expiry}"}}}}"#
683        )
684    }
685
686    #[tokio::test]
687    #[ignore = "fails on windows mysteriously"]
688    async fn exec_auth_command() -> Result<(), Error> {
689        let expiry = (Timestamp::now() + SIXTY_SEC).to_string();
690        let auth_info = gcp_auth_info("echo", &format!("'{}'", cred_json("my_token", &expiry)));
691        match Auth::try_from(&auth_info).unwrap() {
692            Auth::RefreshableToken(RefreshableToken::Exec(refreshable)) => {
693                let (token, _expire, info) = Arc::try_unwrap(refreshable).unwrap().into_inner();
694                assert_eq!(token.expose_secret(), &"my_token".to_owned());
695                let config = info.auth_provider.unwrap().config;
696                assert_eq!(config.get("access-token"), Some(&"my_token".to_owned()));
697            }
698            _ => unreachable!(),
699        }
700        Ok(())
701    }
702
703    #[tokio::test]
704    #[ignore = "shells out to echo/sh; skipped on windows"]
705    async fn exec_token_refresh_via_to_header() -> Result<(), Error> {
706        let fresh_expiry = (Timestamp::now() + SignedDuration::from_secs(3600)).to_string();
707        let auth_info = gcp_auth_info("echo", &format!("'{}'", cred_json("my_token", &fresh_expiry)));
708        // Seed with a stale token + past expiry to force the refresh branch.
709        let stale_expiry = Timestamp::now() - SIXTY_SEC;
710        let refreshable = RefreshableToken::Exec(Arc::new(Mutex::new((
711            SecretString::from("stale"),
712            stale_expiry,
713            auth_info,
714        ))));
715
716        let header = refreshable.to_header().await?;
717        assert_eq!(header, HeaderValue::from_static("Bearer my_token"));
718
719        // Cached state should have been updated in place.
720        if let RefreshableToken::Exec(data) = &refreshable {
721            let locked = data.lock().await;
722            assert_eq!(locked.0.expose_secret(), "my_token");
723            assert!(locked.1 > Timestamp::now(), "expiry should be in the future");
724        } else {
725            unreachable!();
726        }
727        Ok(())
728    }
729
730    #[tokio::test(flavor = "current_thread")]
731    #[ignore = "shells out to echo/sh; skipped on windows"]
732    async fn exec_token_refresh_does_not_block_runtime() -> Result<(), Error> {
733        use std::io::Write;
734        let fresh_expiry = (Timestamp::now() + SignedDuration::from_secs(3600)).to_string();
735        // Write a script that sleeps ~300ms before emitting credentials; the gcp provider
736        // splits cmd-args on spaces so we can't inline this via `sh -c`.
737        let mut script = tempfile::NamedTempFile::new().unwrap();
738        writeln!(script, "#!/bin/sh").unwrap();
739        writeln!(script, "sleep 0.3").unwrap();
740        writeln!(script, "echo '{}'", cred_json("my_token", &fresh_expiry)).unwrap();
741        script.flush().unwrap();
742        let script_path = script.path().to_str().unwrap().to_owned();
743
744        let auth_info = gcp_auth_info("sh", &script_path);
745        let stale_expiry = Timestamp::now() - SIXTY_SEC;
746        let refreshable = RefreshableToken::Exec(Arc::new(Mutex::new((
747            SecretString::from("stale"),
748            stale_expiry,
749            auth_info,
750        ))));
751
752        // On a current_thread runtime, if to_header() blocked the worker the 50ms
753        // timer below could not fire until the 300ms exec completed.
754        let start = Instant::now();
755        let (header, timer_elapsed) = tokio::join!(refreshable.to_header(), async {
756            tokio::time::sleep(Duration::from_millis(50)).await;
757            start.elapsed()
758        });
759
760        assert!(
761            timer_elapsed < Duration::from_millis(200),
762            "timer took {timer_elapsed:?}; to_header likely blocked the runtime"
763        );
764        assert_eq!(header?, HeaderValue::from_static("Bearer my_token"));
765        Ok(())
766    }
767
768    #[test]
769    fn token_file() {
770        let file = tempfile::NamedTempFile::new().unwrap();
771        std::fs::write(file.path(), "token1").unwrap();
772        let mut token_file = TokenFile::new(file.path()).unwrap();
773        assert_eq!(token_file.cached_token().unwrap(), "token1");
774        assert!(!token_file.is_expiring());
775        assert_eq!(token_file.token(), "token1");
776        // Doesn't reload unless expiring
777        std::fs::write(file.path(), "token2").unwrap();
778        assert_eq!(token_file.token(), "token1");
779
780        token_file.expires_at = Timestamp::now();
781        assert!(token_file.is_expiring());
782        assert_eq!(token_file.cached_token(), None);
783        assert_eq!(token_file.token(), "token2");
784        assert!(!token_file.is_expiring());
785        assert_eq!(token_file.cached_token().unwrap(), "token2");
786    }
787}