#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use kube_derive::CustomResource;
use serde::{Deserialize, Serialize};
use apiexts::CustomResourceDefinition;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1beta1 as apiexts;
use kube::{
api::{Api, ListParams, Meta, PatchParams, PatchStrategy, WatchEvent},
Client,
};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(group = "clux.dev", version = "v1", namespaced)]
#[kube(status = "FooStatus")]
#[kube(apiextensions = "v1beta1")] #[kube(scale = r#"{"specReplicasPath":".spec.replicas", "statusReplicasPath":".status.replicas"}"#)]
pub struct FooSpec {
name: String,
info: Option<String>,
replicas: i32,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
pub struct FooStatus {
is_bad: bool,
replicas: i32,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=info");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());
let ssapply = PatchParams {
patch_strategy: PatchStrategy::Apply,
force: true,
field_manager: Some("crd_apply_example".to_string()),
..Default::default()
};
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
info!("Creating crd: {}", serde_yaml::to_string(&Foo::crd())?);
match crds
.patch("foos.clux.dev", &ssapply, serde_yaml::to_vec(&Foo::crd())?)
.await
{
Ok(o) => info!("Applied {}: ({:?})", Meta::name(&o), o.spec),
Err(kube::Error::Api(ae)) => {
warn!("apply error: {:?}", ae);
assert_eq!(ae.code, 409); }
Err(e) => return Err(e.into()),
}
wait_for_crd_ready(&crds).await?;
let foos: Api<Foo> = Api::namespaced(client.clone(), &namespace);
let foo = Foo::new("baz", FooSpec {
name: "baz".into(),
info: Some("old baz".into()),
replicas: 3,
});
info!("Applying 1: \n{}", serde_yaml::to_string(&foo)?);
let o = foos.patch("baz", &ssapply, serde_yaml::to_vec(&foo)?).await?;
info!("Applied 1 {}: {:?}", Meta::name(&o), o.spec);
let patch = serde_json::json!({
"apiVersion": "clux.dev/v1",
"kind": "Foo",
"spec": {
"name": "foo",
"replicas": 2
}
});
info!("Applying 2: \n{}", serde_yaml::to_string(&patch)?);
let o2 = foos.patch("baz", &ssapply, serde_yaml::to_vec(&patch)?).await?;
info!("Applied 2 {}: {:?}", Meta::name(&o2), o2.spec);
Ok(())
}
async fn wait_for_crd_ready(crds: &Api<CustomResourceDefinition>) -> anyhow::Result<()> {
if crds.get("foos.clux.dev").await.is_ok() {
return Ok(());
}
let lp = ListParams::default()
.fields(&format!("metadata.name={}", "foos.clux.dev")) .timeout(5); let mut stream = crds.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(s) = status {
info!("Modify event for {}", Meta::name(&s));
if let Some(s) = s.status {
if let Some(conds) = s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "NamesAccepted") {
if pcond.status == "True" {
info!("crd was accepted: {:?}", pcond);
return Ok(());
}
}
}
}
}
}
Err(anyhow::anyhow!("Timed out waiting for crd to become accepted"))
}