use std::fmt::Display;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures_util::future::ready;
use futures_util::future::FutureExt;
use futures_util::stream::once;
use futures_util::stream::BoxStream;
use futures_util::stream::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
use tracing::trace;
use k8_diff::{Changes, Diff};
use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
use k8_types::options::DeleteOptions;
use crate::diff::PatchMergeType;
use crate::{ApplyResult, DiffableK8Obj};
#[derive(Clone)]
pub enum NameSpace {
All,
Named(String),
}
#[derive(Debug)]
pub struct ObjectKeyNotFound {
key: String,
}
impl ObjectKeyNotFound {
pub fn new(key: String) -> Self {
Self { key }
}
}
impl std::error::Error for ObjectKeyNotFound {}
impl Display for ObjectKeyNotFound {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "'{}' not found", self.key)
}
}
impl NameSpace {
pub fn is_all(&self) -> bool {
matches!(self, Self::All)
}
pub fn named(&self) -> &str {
match self {
Self::All => "all",
Self::Named(name) => name,
}
}
}
impl From<String> for NameSpace {
fn from(namespace: String) -> Self {
NameSpace::Named(namespace)
}
}
impl From<&str> for NameSpace {
fn from(namespace: &str) -> Self {
NameSpace::Named(namespace.to_owned())
}
}
#[derive(Default, Clone)]
pub struct ListArg {
pub field_selector: Option<String>,
pub include_uninitialized: Option<bool>,
pub label_selector: Option<String>,
}
pub type TokenStreamResult<S> = Result<Vec<Result<K8Watch<S>>>>;
#[allow(clippy::redundant_closure)]
pub fn as_token_stream_result<S, E>(events: Vec<K8Watch<S>>) -> TokenStreamResult<S>
where
S: Spec,
S::Status: Serialize + DeserializeOwned,
S::Header: Serialize + DeserializeOwned,
{
Ok(events.into_iter().map(|event| Ok(event)).collect())
}
#[async_trait]
pub trait MetadataClient: Send + Sync {
async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
where
S: Spec,
M: K8Meta + Send + Sync;
async fn retrieve_items<S, N>(&self, namespace: N) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync,
{
self.retrieve_items_with_option(namespace, None).await
}
async fn retrieve_items_with_option<S, N>(
&self,
namespace: N,
option: Option<ListArg>,
) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync;
fn retrieve_items_in_chunks<'a, S, N>(
self: Arc<Self>,
namespace: N,
limit: u32,
option: Option<ListArg>,
) -> BoxStream<'a, K8List<S>>
where
S: Spec + 'static,
N: Into<NameSpace> + Send + Sync + 'static;
async fn delete_item_with_option<S, M>(
&self,
metadata: &M,
option: Option<DeleteOptions>,
) -> Result<DeleteStatus<S>>
where
S: Spec,
M: K8Meta + Send + Sync;
async fn delete_item<S, M>(&self, metadata: &M) -> Result<DeleteStatus<S>>
where
S: Spec,
M: K8Meta + Send + Sync,
{
self.delete_item_with_option::<S, M>(metadata, None).await
}
async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
where
S: Spec;
async fn apply<S>(&self, value: InputK8Obj<S>) -> Result<ApplyResult<S>>
where
S: Spec,
{
debug!("{}: applying '{}' changes", S::label(), value.metadata.name);
trace!("{}: applying {:#?}", S::label(), value);
match self.retrieve_item(&value.metadata).await {
Ok(Some(old_item)) => {
let mut old_spec: S = old_item.spec;
old_spec.make_same(&value.spec);
let new_obj = serde_json::to_value(DiffableK8Obj::new(
value.metadata.clone(),
value.spec.clone(),
value.header.clone(),
))?;
let old_obj = serde_json::to_value(DiffableK8Obj::new(
old_item.metadata,
old_spec,
old_item.header,
))?;
let diff = old_obj.diff(&new_obj)?;
match diff {
Diff::None => {
debug!("{}: no diff detected, doing nothing", S::label());
Ok(ApplyResult::None)
}
Diff::Patch(p) => {
let json_diff = serde_json::to_value(p)?;
debug!("{}: detected diff: old vs. new obj", S::label());
trace!("{}: new obj: {:#?}", S::label(), &new_obj);
trace!("{}: old obj: {:#?}", S::label(), &old_obj);
trace!("{}: new/old diff: {:#?}", S::label(), json_diff);
let patch_result = self.patch_obj(&value.metadata, &json_diff).await?;
Ok(ApplyResult::Patched(patch_result))
}
_ => Err(anyhow!("unsupported diff type")),
}
}
Ok(None) => {
debug!(
"{}: item '{}' not found, creating ...",
S::label(),
value.metadata.name
);
let created_item = self.create_item(value).await?;
Ok(ApplyResult::Created(created_item))
}
Err(err) => Err(err),
}
}
async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
where
S: Spec;
async fn patch_obj<S, M>(&self, metadata: &M, patch: &Value) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
{
self.patch(metadata, patch, PatchMergeType::for_spec(S::metadata()))
.await
}
async fn patch<S, M>(
&self,
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync;
async fn patch_status<S, M>(
&self,
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync;
fn watch_stream_since<S, N>(
&self,
namespace: N,
resource_version: Option<String>,
) -> BoxStream<'_, TokenStreamResult<S>>
where
S: Spec + 'static,
N: Into<NameSpace>;
fn watch_stream_now<S>(&self, ns: String) -> BoxStream<'_, TokenStreamResult<S>>
where
S: Spec + 'static,
{
let ft_stream = async move {
let namespace = ns.as_ref();
match self.retrieve_items_with_option(namespace, None).await {
Ok(item_now_list) => {
let resource_version = item_now_list.metadata.resource_version;
let items_watch_stream =
self.watch_stream_since(namespace, Some(resource_version));
let items_list = item_now_list
.items
.into_iter()
.map(|item| Ok(K8Watch::ADDED(item)))
.collect();
let list_stream = once(ready(Ok(items_list)));
list_stream.chain(items_watch_stream).left_stream()
}
Err(err) => once(ready(Err(err))).right_stream(),
}
};
ft_stream.flatten_stream().boxed()
}
async fn exists<S, M>(&self, metadata: &M) -> Result<bool>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
{
debug!("check if '{}' exists", metadata);
match self.retrieve_item::<S, M>(metadata).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(err) => Err(err),
}
}
}