use std::sync::Arc;
use async_trait::async_trait;
use kube::runtime::controller::Action;
use kube::runtime::watcher::Config;
use kube::{Api, Client, CustomResource};
use kuberator::cache::{CachingStrategy, StaticApiProvider};
use kuberator::error::Result as KubeResult;
use kuberator::k8s::K8sRepository;
use kuberator::{AsStatusError, Context, Finalize, ObserveGeneration, Reconcile, TryResource, WithStatusError};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use thiserror::Error as ThisError;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
#[kube(
group = "example.kuberator.io",
version = "v1",
kind = "MyApp",
plural = "myapps",
namespaced,
status = "MyAppStatus"
)]
pub struct MyAppSpec {
pub replicas: i32,
pub image: String,
}
#[derive(ThisError, Debug)]
pub enum MyAppError {
#[error("Invalid replica count: {0}")]
InvalidReplicas(i32),
#[error("Invalid image format: {0}")]
InvalidImage(String),
#[error("External service unavailable")]
ServiceUnavailable,
}
impl From<MyAppError> for kuberator::error::Error {
fn from(error: MyAppError) -> kuberator::error::Error {
kuberator::error::Error::Anyhow(anyhow::anyhow!(error))
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
pub struct MyAppStatusError {
pub code: String,
pub message: String,
}
impl AsStatusError<MyAppStatusError> for MyAppError {
fn as_status_error(&self) -> MyAppStatusError {
match self {
MyAppError::InvalidReplicas(count) => MyAppStatusError {
code: "INVALID_REPLICAS".to_string(),
message: format!("Replica count {} is invalid (must be between 1 and 10)", count),
},
MyAppError::InvalidImage(image) => MyAppStatusError {
code: "INVALID_IMAGE".to_string(),
message: format!("Image '{}' has invalid format", image),
},
MyAppError::ServiceUnavailable => MyAppStatusError {
code: "SERVICE_UNAVAILABLE".to_string(),
message: "External service is temporarily unavailable".to_string(),
},
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
pub struct MyAppStatus {
pub state: String,
pub observed_generation: Option<i64>,
pub error: Option<MyAppStatusError>,
}
impl ObserveGeneration for MyAppStatus {
fn add(&mut self, observed_generation: i64) {
self.observed_generation = Some(observed_generation);
}
}
impl WithStatusError<MyAppError, MyAppStatusError> for MyAppStatus {
fn add(&mut self, error: MyAppStatusError) {
self.error = Some(error);
}
}
type MyK8sRepo = K8sRepository<MyApp, StaticApiProvider<MyApp>>;
struct MyContext {
repo: Arc<MyK8sRepo>,
}
impl MyContext {
fn validate_spec(&self, spec: &MyAppSpec) -> Result<(), MyAppError> {
if spec.replicas < 1 || spec.replicas > 10 {
return Err(MyAppError::InvalidReplicas(spec.replicas));
}
if !spec.image.contains(':') {
return Err(MyAppError::InvalidImage(spec.image.clone()));
}
Ok(())
}
}
#[async_trait]
impl Context<MyApp, MyK8sRepo, StaticApiProvider<MyApp>> for MyContext {
fn k8s_repository(&self) -> Arc<MyK8sRepo> {
Arc::clone(&self.repo)
}
fn finalizer(&self) -> &'static str {
"myapps.example.kuberator.io/finalizer"
}
async fn handle_apply(&self, object: Arc<MyApp>) -> KubeResult<Action> {
let name = object.try_name()?;
let namespace = object.try_namespace()?;
tracing::info!("Reconciling MyApp {}/{}", namespace, name);
match self.validate_spec(&object.spec) {
Ok(_) => {
tracing::info!("Validation passed");
tracing::info!(
"Deploying {} replicas with image {}",
object.spec.replicas,
object.spec.image
);
let status = MyAppStatus {
state: "Ready".to_string(),
observed_generation: None,
error: None,
};
self.repo.update_status(&object, status).await?;
Ok(Action::await_change())
}
Err(e) => {
tracing::error!("Validation failed: {}", e);
let status = MyAppStatus {
state: "Error".to_string(),
observed_generation: None,
error: None, };
self.repo.update_status_with_error(&object, status, Some(&e)).await?;
Ok(Action::requeue(std::time::Duration::from_secs(60)))
}
}
}
async fn handle_cleanup(&self, object: Arc<MyApp>) -> KubeResult<Action> {
let name = object.try_name()?;
let namespace = object.try_namespace()?;
tracing::info!("Cleaning up MyApp {}/{}", namespace, name);
let status = MyAppStatus {
state: "Terminating".to_string(),
observed_generation: None,
error: None,
};
let _ = self.repo.update_status(&object, status).await;
Ok(Action::await_change())
}
}
struct MyReconciler {
context: Arc<MyContext>,
crd_api: Api<MyApp>,
}
#[async_trait]
impl Reconcile<MyApp, MyContext, MyK8sRepo, StaticApiProvider<MyApp>> for MyReconciler {
fn destruct(self) -> (Api<MyApp>, Config, Arc<MyContext>) {
(self.crd_api, Config::default(), self.context)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
tracing::info!("Starting Error Handling Example");
let client = Client::try_default().await?;
let api_provider = StaticApiProvider::new(client.clone(), vec!["default"], CachingStrategy::Strict);
let k8s_repo = K8sRepository::new(api_provider);
let context = MyContext {
repo: Arc::new(k8s_repo),
};
let reconciler = MyReconciler {
context: Arc::new(context),
crd_api: Api::namespaced(client, "default"),
};
tracing::info!("Watching MyApps in 'default' namespace");
tracing::info!("This example requires the MyApp CRD to be installed");
tracing::info!("Try creating MyApps with invalid replicas (<1 or >10) to see error handling");
reconciler.start::<futures::future::Ready<()>>(None).await;
Ok(())
}