1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use crate::Cache;
use chrono::Utc;
use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec, TokenReviewStatus};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLock;
use tower::Service;

#[derive(Clone)]
pub struct Client {
    client: kube::Client,
    cache: Arc<RwLock<Cache>>,
}

impl Client {
    pub fn new(client: kube::Client) -> Self {
        Self {
            client,
            cache: Arc::new(RwLock::new(Cache::new())),
        }
    }

    pub async fn try_default() -> Result<Self, kube::Error> {
        Ok(Self::new(kube::Client::try_default().await?))
    }
}

impl Service<TokenReviewSpec> for &Client {
    type Response = Option<TokenReviewStatus>;
    type Error = kube::Error;
    type Future =
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: TokenReviewSpec) -> Self::Future {
        let client = self.client.clone();
        let cache = self.cache.clone();
        Box::pin(async move {
            if crate::exp(&request) > Some(Utc::now().timestamp()) {
                let status = cache.read().await.get(&request).cloned();
                if let Some(status) = status {
                    Ok(Some(status))
                } else {
                    let status = kube::Api::all(client)
                        .create(
                            &Default::default(),
                            &TokenReview {
                                spec: request.clone(),
                                ..Default::default()
                            },
                        )
                        .await?
                        .status;
                    if let Some(status) = &status {
                        cache.write().await.put(request, status.clone());
                    }
                    Ok(status)
                }
            } else {
                Ok(None)
            }
        })
    }
}

impl Service<TokenReviewSpec> for Client {
    type Response = Option<TokenReviewStatus>;
    type Error = kube::Error;
    type Future =
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        (&*self).poll_ready(cx)
    }

    fn call(&mut self, request: TokenReviewSpec) -> Self::Future {
        (&*self).call(request)
    }
}