token_review/
client.rs

1use crate::Cache;
2use chrono::Utc;
3use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec, TokenReviewStatus};
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio::sync::RwLock;
9use tower::Service;
10
11#[derive(Clone)]
12pub struct Client {
13    client: kube::Client,
14    cache: Arc<RwLock<Cache>>,
15}
16
17impl Client {
18    pub fn new(client: kube::Client) -> Self {
19        Self {
20            client,
21            cache: Arc::new(RwLock::new(Cache::new())),
22        }
23    }
24
25    pub async fn try_default() -> Result<Self, kube::Error> {
26        Ok(Self::new(kube::Client::try_default().await?))
27    }
28}
29
30impl Service<TokenReviewSpec> for &Client {
31    type Response = Option<TokenReviewStatus>;
32    type Error = kube::Error;
33    type Future =
34        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
35
36    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37        Poll::Ready(Ok(()))
38    }
39
40    fn call(&mut self, request: TokenReviewSpec) -> Self::Future {
41        let client = self.client.clone();
42        let cache = self.cache.clone();
43        Box::pin(async move {
44            if crate::exp(&request) > Some(Utc::now().timestamp()) {
45                let status = cache.read().await.get(&request).cloned();
46                if let Some(status) = status {
47                    Ok(Some(status))
48                } else {
49                    let status = kube::Api::all(client)
50                        .create(
51                            &Default::default(),
52                            &TokenReview {
53                                spec: request.clone(),
54                                ..Default::default()
55                            },
56                        )
57                        .await?
58                        .status;
59                    if let Some(status) = &status {
60                        cache.write().await.put(request, status.clone());
61                    }
62                    Ok(status)
63                }
64            } else {
65                Ok(None)
66            }
67        })
68    }
69}
70
71impl Service<TokenReviewSpec> for Client {
72    type Response = Option<TokenReviewStatus>;
73    type Error = kube::Error;
74    type Future =
75        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
76
77    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78        (&*self).poll_ready(cx)
79    }
80
81    fn call(&mut self, request: TokenReviewSpec) -> Self::Future {
82        (&*self).call(request)
83    }
84}