use std::sync::Arc;
use async_trait::async_trait;
use kube::runtime::controller::Action;
use kube::runtime::watcher::Config;
use kube::{Api, Client, CustomResource};
use kuberator::cache::{CachingStrategy, StaticApiProvider};
use kuberator::error::Result as KubeResult;
use kuberator::k8s::K8sRepository;
use kuberator::{Context, Finalize, ObserveGeneration, Reconcile, TryResource};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
#[kube(
group = "example.kuberator.io",
version = "v1",
kind = "MyResource",
plural = "myresources",
namespaced,
status = "MyResourceStatus"
)]
pub struct MyResourceSpec {
pub message: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
pub struct MyResourceStatus {
pub state: String,
pub observed_generation: Option<i64>,
}
impl ObserveGeneration for MyResourceStatus {
fn add(&mut self, observed_generation: i64) {
self.observed_generation = Some(observed_generation);
}
}
type MyK8sRepo = K8sRepository<MyResource, StaticApiProvider<MyResource>>;
struct MyContext {
repo: Arc<MyK8sRepo>,
}
#[async_trait]
impl Context<MyResource, MyK8sRepo, StaticApiProvider<MyResource>> for MyContext {
fn k8s_repository(&self) -> Arc<MyK8sRepo> {
Arc::clone(&self.repo)
}
fn finalizer(&self) -> &'static str {
"myresources.example.kuberator.io/finalizer"
}
async fn handle_apply(&self, object: Arc<MyResource>) -> KubeResult<Action> {
let name = object.try_name()?;
let namespace = object.try_namespace()?;
tracing::info!("Reconciling MyResource {}/{}", namespace, name);
if let Some(status) = &object.status {
if let (Some(observed), Some(current)) = (status.observed_generation, object.metadata.generation) {
if observed >= current {
tracing::info!("Already reconciled generation {}, skipping", current);
return Ok(Action::await_change());
}
}
}
tracing::info!("Processing new generation");
tracing::info!("Message: {}", object.spec.message);
let status = MyResourceStatus {
state: "Ready".to_string(),
observed_generation: None, };
self.repo.update_status(&object, status).await?;
tracing::info!("Status updated successfully");
Ok(Action::await_change())
}
async fn handle_cleanup(&self, object: Arc<MyResource>) -> KubeResult<Action> {
let name = object.try_name()?;
let namespace = object.try_namespace()?;
tracing::info!("Cleaning up MyResource {}/{}", namespace, name);
let status = MyResourceStatus {
state: "Terminating".to_string(),
observed_generation: None,
};
let _ = self.repo.update_status(&object, status).await;
Ok(Action::await_change())
}
}
struct MyReconciler {
context: Arc<MyContext>,
crd_api: Api<MyResource>,
}
#[async_trait]
impl Reconcile<MyResource, MyContext, MyK8sRepo, StaticApiProvider<MyResource>> for MyReconciler {
fn destruct(self) -> (Api<MyResource>, Config, Arc<MyContext>) {
(self.crd_api, Config::default(), self.context)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
tracing::info!("Starting Operator with Status Example");
let client = Client::try_default().await?;
let api_provider = StaticApiProvider::new(client.clone(), vec!["default"], CachingStrategy::Strict);
let k8s_repo = K8sRepository::new(api_provider);
let context = MyContext {
repo: Arc::new(k8s_repo),
};
let reconciler = MyReconciler {
context: Arc::new(context),
crd_api: Api::namespaced(client, "default"),
};
tracing::info!("Watching MyResources in 'default' namespace");
tracing::info!("This example requires the MyResource CRD to be installed");
reconciler.start::<futures::future::Ready<()>>(None).await;
Ok(())
}