use crate::{Client, Error, Result};
use k8s_openapi::{
api::core::v1::{LocalObjectReference, Namespace as k8sNs, ObjectReference},
apimachinery::pkg::apis::meta::v1::OwnerReference,
};
use kube_core::{
object::ObjectList,
params::{GetParams, ListParams},
request::Request,
ApiResource, ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
trait ClusterScope {}
trait NamespaceScope {}
impl ClusterScope for ClusterResourceScope {}
impl NamespaceScope for NamespaceResourceScope {}
impl NamespaceScope for DynamicResourceScope {}
impl ClusterScope for DynamicResourceScope {}
pub trait CollectionUrl<K> {
fn url_path(&self) -> String;
}
pub trait ObjectUrl<K> {
fn url_path(&self) -> String;
}
#[derive(Debug, Clone)]
pub struct Cluster;
#[derive(Debug, Clone)]
pub struct Namespace(String);
pub trait ObjectRef<K>: ObjectUrl<K> {
fn name(&self) -> Option<&str>;
}
pub trait NamespacedRef<K> {
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K>;
}
impl<K> ObjectUrl<K> for ObjectReference
where
K: Resource,
{
fn url_path(&self) -> String {
url_path(
&ApiResource::from_gvk(&self.clone().into()),
self.namespace.clone(),
)
}
}
impl<K> ObjectRef<K> for ObjectReference
where
K: Resource,
{
fn name(&self) -> Option<&str> {
self.name.as_deref()
}
}
impl<K> NamespacedRef<K> for ObjectReference
where
K: Resource,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
Self {
namespace: namespace.into(),
..self.clone()
}
}
}
impl<K> ObjectUrl<K> for OwnerReference
where
K: Resource,
K::Scope: ClusterScope,
{
fn url_path(&self) -> String {
url_path(&ApiResource::from_gvk(&self.clone().into()), None)
}
}
impl<K> ObjectRef<K> for OwnerReference
where
K: Resource,
K::Scope: ClusterScope,
{
fn name(&self) -> Option<&str> {
self.name.as_str().into()
}
}
impl<K> NamespacedRef<K> for OwnerReference
where
K: Resource,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
ObjectReference {
api_version: self.api_version.clone().into(),
namespace: namespace.into(),
name: self.name.clone().into(),
uid: self.uid.clone().into(),
kind: self.kind.clone().into(),
..Default::default()
}
}
}
impl<K> NamespacedRef<K> for LocalObjectReference
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
let dt = Default::default();
ObjectReference {
api_version: K::api_version(&dt).to_string().into(),
namespace: namespace.into(),
name: Some(self.name.clone()),
kind: K::kind(&dt).to_string().into(),
..Default::default()
}
}
}
pub mod scope {
pub use super::{Cluster, Namespace, NamespacedRef};
}
impl<K> CollectionUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}
impl<K> ObjectUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
K::Scope: ClusterScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}
impl<K> CollectionUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}
impl<K> ObjectUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}
impl TryFrom<&k8sNs> for Namespace {
type Error = NamespaceError;
fn try_from(ns: &k8sNs) -> Result<Namespace, Self::Error> {
if let Some(n) = &ns.meta().name {
Ok(Namespace(n.to_owned()))
} else {
Err(NamespaceError::MissingName)
}
}
}
impl From<&str> for Namespace {
fn from(ns: &str) -> Namespace {
Namespace(ns.to_owned())
}
}
impl From<String> for Namespace {
fn from(ns: String) -> Namespace {
Namespace(ns)
}
}
#[derive(thiserror::Error, Debug)]
pub enum NamespaceError {
#[error("Missing Namespace Name")]
MissingName,
}
impl Client {
pub async fn get<K>(&self, name: &str, scope: &impl ObjectUrl<K>) -> Result<K>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.get(name, &GetParams::default())
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.request::<K>(req).await
}
pub async fn fetch<K>(&self, reference: &impl ObjectRef<K>) -> Result<K>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
{
let mut req = Request::new(reference.url_path())
.get(
reference
.name()
.ok_or(Error::RefResolve("Reference is empty".to_string()))?,
&GetParams::default(),
)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.request::<K>(req).await
}
pub async fn list<K>(&self, lp: &ListParams, scope: &impl CollectionUrl<K>) -> Result<ObjectList<K>>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.list(lp)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("list");
self.request::<ObjectList<K>>(req).await
}
}
fn url_path(r: &ApiResource, namespace: Option<String>) -> String {
let n = if let Some(ns) = namespace {
format!("namespaces/{ns}/")
} else {
"".into()
};
format!(
"/{group}/{api_version}/{namespaces}{plural}",
group = if r.group.is_empty() { "api" } else { "apis" },
api_version = r.api_version,
namespaces = n,
plural = r.plural
)
}
#[cfg(test)]
mod test {
use crate::{
client::{
client_ext::NamespacedRef as _,
scope::{Cluster, Namespace},
},
Client,
};
use super::ListParams;
use k8s_openapi::api::core::v1::LocalObjectReference;
use kube_core::{DynamicObject, Resource as _, ResourceExt as _};
#[tokio::test]
#[ignore = "needs cluster (will list/get namespaces, pods, jobs, svcs, clusterroles)"]
async fn client_ext_list_get_pods_svcs() -> Result<(), Box<dyn std::error::Error>> {
use k8s_openapi::api::{
batch::v1::Job,
core::v1::{Namespace as k8sNs, Pod, Service},
rbac::v1::ClusterRole,
};
let client = Client::try_default().await?;
let lp = ListParams::default();
for ns in client.list::<k8sNs>(&lp, &Cluster).await? {
for p in client.list::<Pod>(&lp, &Namespace::try_from(&ns)?).await? {
println!("Found pod {} in {}", p.name_any(), ns.name_any());
}
}
for j in client.list::<Job>(&lp, &Cluster).await? {
println!("Found job {} in {}", j.name_any(), j.namespace().unwrap());
}
let default: Namespace = "default".into();
let svc = client.get::<Service>("kubernetes", &default).await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
let ca = client.get::<ClusterRole>("cluster-admin", &Cluster).await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");
Ok(())
}
#[tokio::test]
#[ignore = "needs cluster (will get svcs, clusterroles, pods, nodes)"]
async fn client_ext_fetch_ref_pods_svcs() -> Result<(), Box<dyn std::error::Error>> {
use k8s_openapi::api::{
core::v1::{Node, ObjectReference, Pod, Service},
rbac::v1::ClusterRole,
};
let client = Client::try_default().await?;
let svc: Service = client
.fetch(&ObjectReference {
kind: Some(Service::kind(&()).into()),
api_version: Some(Service::api_version(&()).into()),
name: Some("kubernetes".into()),
namespace: Some("default".into()),
..Default::default()
})
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
let ca: ClusterRole = client
.fetch(&ObjectReference {
kind: Some(ClusterRole::kind(&()).into()),
api_version: Some(ClusterRole::api_version(&()).into()),
name: Some("cluster-admin".into()),
..Default::default()
})
.await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");
let svc: DynamicObject = client.fetch(&svc.object_ref(&())).await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
let ca: DynamicObject = client.fetch(&ca.object_ref(&())).await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");
let svc: Service = client
.fetch(
&LocalObjectReference {
name: svc.name_any().into(),
}
.within(svc.namespace()),
)
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
let kube_system: Namespace = "kube-system".into();
for pod in client
.list::<Pod>(
&ListParams::default().labels("component=kube-apiserver"),
&kube_system,
)
.await?
{
let owner = pod
.owner_references()
.iter()
.find(|r| r.kind == Node::kind(&()))
.ok_or("Not found")?;
let _: Node = client.fetch(owner).await?;
}
Ok(())
}
#[tokio::test]
#[ignore = "needs cluster (will get svcs, clusterroles, pods, nodes)"]
async fn fetch_fails() -> Result<(), Box<dyn std::error::Error>> {
use crate::error::Error;
use k8s_openapi::api::core::v1::{ObjectReference, Pod, Service};
let client = Client::try_default().await?;
let svc: Service = client
.fetch(&ObjectReference {
kind: Some(Service::kind(&()).into()),
api_version: Some(Service::api_version(&()).into()),
name: Some("kubernetes".into()),
namespace: Some("default".into()),
..Default::default()
})
.await?;
let err = client.fetch::<Pod>(&svc.object_ref(&())).await.unwrap_err();
assert!(matches!(err, Error::SerdeError(_)));
assert_eq!(err.to_string(), "Error deserializing response: invalid value: string \"Service\", expected Pod at line 1 column 17".to_string());
let obj: DynamicObject = client.fetch(&svc.object_ref(&())).await?;
let err = obj.try_parse::<Pod>().unwrap_err();
assert_eq!(err.to_string(), "failed to parse this DynamicObject into a Resource: invalid value: string \"Service\", expected Pod".to_string());
Ok(())
}
}