use kube::api::{PatchParams, Resource, ResourceExt};
use kube::Api;
use serde::de::DeserializeOwned;
use tracing::Instrument;
use tracing::{debug, error, trace, warn};
use crate::object::ObjectStatus;
use crate::Manifest;
pub use crate::object::ObjectState as ResourceState;
pub struct StateHolder<S: ResourceState> {
pub(crate) state: Box<dyn State<S>>,
}
impl<S: ResourceState> From<StateHolder<S>> for Box<dyn State<S>> {
fn from(holder: StateHolder<S>) -> Box<dyn State<S>> {
holder.state
}
}
pub enum Transition<S: ResourceState> {
Next(StateHolder<S>),
Complete(anyhow::Result<()>),
}
pub trait TransitionTo<S> {}
impl<S: ResourceState> Transition<S> {
#[allow(clippy::boxed_local)]
pub fn next<I: State<S>, O: State<S>>(_i: Box<I>, o: O) -> Transition<S>
where
I: TransitionTo<O>,
{
Transition::Next(StateHolder { state: Box::new(o) })
}
#[allow(clippy::boxed_local)]
pub fn next_unchecked<I: State<S>, O: State<S>>(_i: Box<I>, o: O) -> Transition<S> {
Transition::Next(StateHolder { state: Box::new(o) })
}
}
pub type SharedState<T> = std::sync::Arc<tokio::sync::RwLock<T>>;
#[async_trait::async_trait]
pub trait State<S: ResourceState>: Sync + Send + 'static + std::fmt::Debug {
async fn next(
self: Box<Self>,
shared: SharedState<S::SharedState>,
state: &mut S,
manifest: Manifest<S::Manifest>,
) -> Transition<S>;
async fn status(&self, state: &mut S, manifest: &S::Manifest) -> anyhow::Result<S::Status>;
}
pub async fn run_to_completion<S: ResourceState>(
client: &kube::Client,
state: impl State<S>,
shared: SharedState<S::SharedState>,
object_state: &mut S,
manifest: Manifest<S::Manifest>,
) where
S::Manifest: Resource + DeserializeOwned,
<S::Manifest as kube::Resource>::DynamicType: std::default::Default,
S::Status: ObjectStatus,
{
let (name, namespace, api) = {
let initial_manifest = manifest.latest();
let namespace = initial_manifest.namespace();
let name = initial_manifest.name();
let api: Api<S::Manifest> = match namespace {
Some(ref namespace) => Api::namespaced(client.clone(), namespace),
None => Api::all(client.clone()),
};
(name, namespace, api)
};
let mut state: Box<dyn State<S>> = Box::new(state);
loop {
state = match execute_object_state(
&name,
&namespace,
state,
&api,
&shared,
object_state,
&manifest,
)
.await
{
Some(state) => state,
None => break,
}
}
}
#[tracing::instrument(level = "trace", skip(object_state, manifest, api, shared))]
async fn execute_object_state<S: ResourceState>(
name: &str,
namespace: &Option<String>,
state: Box<dyn State<S>>,
api: &Api<S::Manifest>,
shared: &SharedState<S::SharedState>,
object_state: &mut S,
manifest: &Manifest<S::Manifest>,
) -> Option<Box<dyn State<S>>>
where
S::Manifest: Resource + DeserializeOwned,
S::Status: ObjectStatus,
{
let latest_manifest = manifest.latest();
let span = tracing::debug_span!("State::status");
match state
.status(object_state, &latest_manifest)
.instrument(span)
.await
{
Ok(status) => {
patch_status(api, name, status).await;
}
Err(error) => {
warn!(?error, "Object status patch returned error.",);
}
}
let transition = {
let span = tracing::trace_span!("State::next",);
state
.next(shared.clone(), object_state, manifest.clone())
.instrument(span)
.await
};
match transition {
Transition::Next(s) => {
let next_state = s.into();
trace!(?next_state, "Object transitioning to new state",);
Some(next_state)
}
Transition::Complete(result) => match result {
Ok(()) => {
debug!("Object state machine exited without error.",);
None
}
Err(error) => {
error!(?error, "Object state machine exited with error.",);
let status = S::Status::failed(&format!("{:?}", error));
patch_status(api, name, status).await;
None
}
},
}
}
#[tracing::instrument(level = "trace", skip(api, name, status))]
pub async fn patch_status<R: Resource + Clone + DeserializeOwned, S: ObjectStatus>(
api: &Api<R>,
name: &str,
status: S,
) {
let patch = status.json_patch();
debug!(
%name,
%patch,
"Applying status patch to object."
);
match api
.patch_status(
name,
&PatchParams::default(),
&kube::api::Patch::Merge(patch),
)
.await
{
Ok(_) => (),
Err(error) => {
warn!(
%name,
?error,
"Object error patching status."
);
}
}
}
pub mod test {
use super::*;
use crate::ObjectState;
#[derive(Default, Debug)]
pub struct Stub;
#[async_trait::async_trait]
impl<
Resource: Send + Sync + Clone + std::marker::Unpin,
Status: Default + 'static,
ResourceState: ObjectState<Manifest = Resource, Status = Status>,
> State<ResourceState> for Stub
{
async fn next(
self: Box<Self>,
_shared_state: SharedState<ResourceState::SharedState>,
_pod_state: &mut ResourceState,
_pod: Manifest<Resource>,
) -> Transition<ResourceState> {
Transition::Complete(Ok(()))
}
async fn status(
&self,
_state: &mut ResourceState,
_pod: &Resource,
) -> anyhow::Result<Status> {
Ok(Default::default())
}
}
}