Skip to main content

kube_utils/
config.rs

1use std::{net, env, fs, time};
2use std::borrow::Cow;
3use core::fmt;
4
5use crate::Uri;
6
7//Special environment variables set by kuberentes for pod to access internal API
8const KUBERNETES_SERVICE_HOST: &str = "KUBERNETES_SERVICE_HOST";
9const KUBERNETES_SERVICE_PORT: &str = "KUBERNETES_SERVICE_PORT";
10
11//Special files provided by kubernetes within pod environment
12const SERVICE_TOKENFILE: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token";
13const SERVICE_CERTFILE: &str = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
14const SERVICE_DEFAULT_NS: &str = "/var/run/secrets/kubernetes.io/serviceaccount/namespace";
15
16#[derive(Debug)]
17///Possible errors reading kube config
18pub enum KubeError {
19    ///Missing env variables KUBERNETES_SERVICE_HOST
20    MissingServiceHost,
21    ///Missing env variables KUBERNETES_SERVICE_PORT
22    MissingServicePort,
23    ///Service port is not valid port
24    InvalidServicePort,
25    ///Unable to construct valid URI out of host and port
26    InvalidServiceUri(ureq::http::Error),
27    ///Unable to read ca.crt
28    UnableReadCert,
29    ///ca.crt is not valid PEM file
30    InvalidCert,
31    ///namespace is not valid utf-8 file
32    UnableReadNamespace,
33    ///token is not valid utf-8 file
34    UnableReadToken,
35}
36
37impl fmt::Display for KubeError {
38    #[inline]
39    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            Self::MissingServiceHost => fmt.write_fmt(format_args!("env::{KUBERNETES_SERVICE_HOST} is missing")),
42            Self::MissingServicePort => fmt.write_fmt(format_args!("env::{KUBERNETES_SERVICE_PORT} is missing")),
43            Self::InvalidServicePort => fmt.write_fmt(format_args!("env::{KUBERNETES_SERVICE_PORT} is not valid port")),
44            Self::InvalidServiceUri(error) => fmt.write_fmt(format_args!("Unable to construct valid URI: {error}")),
45            Self::UnableReadCert => fmt.write_fmt(format_args!("{SERVICE_CERTFILE}: failed to read")),
46            Self::InvalidCert => fmt.write_fmt(format_args!("{SERVICE_CERTFILE}: not a valid PEM certificate")),
47            Self::UnableReadNamespace => fmt.write_fmt(format_args!("{SERVICE_DEFAULT_NS}: not a valid utf-8 file")),
48            Self::UnableReadToken => fmt.write_fmt(format_args!("{SERVICE_TOKENFILE}: not a valid utf-8 file")),
49        }
50    }
51}
52
53#[inline(always)]
54fn read_token(path: &str) -> Result<String, KubeError> {
55    fs::read_to_string(path).map_err(|_| KubeError::UnableReadToken)
56}
57
58///Local cluster auth token read from mounted file system
59///
60///Token is re-used for at most 1 minute, and unless file is missing, will be reloaded every time after 1 minute
61///If it is impossible to refresh token, then existing token will be used (actual token validity is within 10 minutes)
62pub struct ClusterToken {
63    file: Cow<'static, str>,
64    token: String,
65    last_fetched_at: time::Instant,
66}
67
68impl ClusterToken {
69    #[inline(always)]
70    ///Creates new [ClusterToken] by performing initial fetch of the token at `file` location
71    ///
72    ///Returns error if unable to fetch.
73    ///
74    ///After this, `file` is assumed to be always valid, but if re-fetch fails, it will use
75    ///existing token
76    pub fn new_token(file: Cow<'static, str>) -> Result<Self, KubeError> {
77        let token = read_token(file.as_ref())?;
78        Ok(Self {
79            file,
80            token,
81            last_fetched_at: time::Instant::now()
82        })
83    }
84
85    #[inline(always)]
86    ///Checks if token is expired, returning `true` if that's the case
87    pub fn is_expired(&self) -> bool {
88        self.last_fetched_at.elapsed() >= time::Duration::from_secs(60)
89    }
90
91    ///Force refresh token, returning `false` if failed to read file
92    pub fn refresh(&mut self) -> bool {
93        if let Ok(new_token) = read_token(self.file.as_ref()) {
94            self.token = new_token;
95            true
96        } else {
97            false
98        }
99    }
100
101    #[inline(always)]
102    ///Requests to perform token refresh if it expires
103    ///
104    ///Returns `true` if token has been refreshed
105    pub fn refresh_if_expired(&mut self) -> bool {
106        if self.is_expired() {
107            self.refresh()
108        } else {
109            false
110        }
111    }
112
113    #[inline(always)]
114    ///Returns current token
115    pub fn token(&self) -> &str {
116        self.token.as_str()
117    }
118}
119
120fn build_kube_uri(host: &str, port: u16) -> Result<Uri, KubeError> {
121    let uri = Uri::builder().scheme("https");
122    let uri = match host.parse::<net::IpAddr>() {
123        Ok(ip) => {
124            if port == 443 {
125                if ip.is_ipv6() {
126                    let host = format!("[{ip}]");
127                    uri.authority(host.as_str())
128                } else {
129                    uri.authority(host)
130                }
131            } else {
132                let addr = net::SocketAddr::new(ip, port);
133                uri.authority(addr.to_string().as_str())
134            }
135        }
136        Err(_) => {
137            if port == 443 {
138                uri.authority(host)
139            } else {
140                let host = format!("{host}:{port}");
141                uri.authority(host.as_str())
142            }
143        }
144    };
145
146    uri.build().map_err(|error| KubeError::InvalidServiceUri(error))
147}
148
149///Kubernetes API config
150pub struct KubeConfig {
151    pub(crate) uri: Uri,
152    pub(crate) certs: Vec<ureq::tls::Certificate<'static>>,
153    pub(crate) namespace: String,
154    pub(crate) auth_token: ClusterToken,
155}
156
157impl KubeConfig {
158    ///Creates kubeconfig based on in-cluster environment
159    ///
160    ///Returns `None` if environment is not valid or missing some environment variables
161    pub fn in_cluster_env() -> Result<Self, KubeError> {
162        let host = env::var(KUBERNETES_SERVICE_HOST).map_err(|_| KubeError::MissingServiceHost)?;
163        let port: u16 = env::var(KUBERNETES_SERVICE_PORT).map_err(|_| KubeError::MissingServicePort).and_then(|port| port.parse().map_err(|_| KubeError::InvalidServicePort))?;
164        let uri = build_kube_uri(&host, port)?;
165        let cert = fs::read(SERVICE_CERTFILE).map_err(|_| KubeError::UnableReadCert)?;
166        let certs = ureq::tls::parse_pem(&cert).filter_map(|pem| match pem {
167            Ok(ureq::tls::PemItem::Certificate(cert)) => Some(cert),
168            _ => None
169        }).collect();
170
171        let auth_token = ClusterToken::new_token(SERVICE_TOKENFILE.into())?;
172        let namespace = fs::read_to_string(SERVICE_DEFAULT_NS).map_err(|_| KubeError::UnableReadNamespace)?;
173        let result = Self {
174            uri,
175            certs,
176            namespace,
177            auth_token,
178        };
179
180        if result.certs.is_empty() {
181            return Err(KubeError::InvalidCert);
182        }
183
184        Ok(result)
185    }
186}
187
188#[derive(Copy, Clone)]
189///General HTTP config
190pub struct HttpConfig {
191    pub(crate) timeout: time::Duration,
192}
193
194impl HttpConfig {
195    #[inline]
196    ///Creates new default config
197    pub const fn new() -> Self {
198        Self {
199            timeout: time::Duration::from_secs(10)
200        }
201    }
202
203    #[inline]
204    ///Sets `timeout` on requests.
205    ///
206    ///Defaults to `10s`
207    pub const fn with_timeout(mut self, timeout: time::Duration) -> Self {
208        self.timeout = timeout;
209        self
210    }
211}