use k8s_openapi::api::core::v1::ResourceRequirements;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "KafkaNodePool",
plural = "kafkanodepools",
singular = "kafkanodepool",
shortname = "knp",
namespaced,
status = "KafkaNodePoolStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaNodePoolSpec {
pub roles: Vec<NodeRole>,
#[serde(default = "default_replicas")]
#[schemars(range(min = 1, max = 1))]
pub replicas: i32,
#[schemars(range(min = 0, max = 999_999))]
pub node_id_start: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resources: Option<ResourceRequirements>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub template: Option<PodTemplate>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub storage: Option<Storage>,
}
const fn default_replicas() -> i32 {
1
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash)]
pub enum NodeRole {
Controller,
Broker,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(tag = "type")]
#[schemars(schema_with = "storage_schema")]
pub enum Storage {
Ephemeral,
PersistentClaim(PersistentClaimSpec),
Jbod(JbodSpec),
}
fn storage_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"required": ["type"],
"properties": {
"type": {
"type": "string",
"enum": ["Ephemeral", "PersistentClaim", "Jbod"],
},
"size": { "type": "string" },
"class": { "type": "string" },
"deleteClaim": { "type": "boolean" },
"volumes": {
"type": "array",
"items": {
"type": "object",
"required": ["id", "size"],
"properties": {
"id": { "type": "integer", "format": "int32" },
"size": { "type": "string" },
"class": { "type": "string" },
},
},
},
},
})
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct PersistentClaimSpec {
pub size: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub class: Option<String>,
#[serde(default)]
pub delete_claim: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct JbodSpec {
pub volumes: Vec<JbodVolume>,
#[serde(default)]
pub delete_claim: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct JbodVolume {
pub id: i32,
pub size: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub class: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct PodTemplate {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<MetadataTemplate>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub node_selector: Option<std::collections::BTreeMap<String, String>>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct MetadataTemplate {
#[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
pub labels: std::collections::BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
pub annotations: std::collections::BTreeMap<String, String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaNodePoolStatus {
#[serde(default)]
pub conditions: Vec<crate::crd::kafka::KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replicas: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ready_replicas: Option<i32>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = KafkaNodePool::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "KafkaNodePool");
assert!(crd.spec.names.plural == "kafkanodepools");
assert!(
crd.spec
.names
.short_names
.as_ref()
.is_some_and(|v| v.contains(&"knp".to_string())),
"expected shortname `knp`, got {:?}",
crd.spec.names.short_names
);
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn round_trips_through_json() {
let pool = KafkaNodePool::new(
"brokers",
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: None,
},
);
let json = serde_json::to_string(&pool).unwrap();
assert!(
json.contains("\"nodeIdStart\""),
"expected camelCase wire shape, got: {json}"
);
assert!(
json.contains("\"Controller\""),
"roles serialized in UpperCamelCase, got: {json}"
);
let back: KafkaNodePool = serde_json::from_str(&json).unwrap();
assert!(back.spec == pool.spec);
}
#[test]
fn spec_defaults_replicas_to_one() {
let json = r#"{"roles":["Controller","Broker"],"nodeIdStart":0}"#;
let spec: KafkaNodePoolSpec = serde_json::from_str(json).unwrap();
assert!(spec.replicas == 1);
assert!(spec.image.is_none());
assert!(spec.resources.is_none());
}
#[test]
fn pod_template_round_trips_through_json() {
use k8s_openapi::api::core::v1::{
Affinity, NodeAffinity, NodeSelector, NodeSelectorTerm, Toleration,
};
let mut labels = std::collections::BTreeMap::new();
labels.insert("team".into(), "platform".into());
let template = PodTemplate {
metadata: Some(MetadataTemplate {
labels: labels.clone(),
annotations: std::collections::BTreeMap::new(),
}),
affinity: Some(Affinity {
node_affinity: Some(NodeAffinity {
required_during_scheduling_ignored_during_execution: Some(NodeSelector {
node_selector_terms: vec![NodeSelectorTerm::default()],
}),
preferred_during_scheduling_ignored_during_execution: None,
}),
..Default::default()
}),
tolerations: vec![Toleration {
key: Some("dedicated".into()),
operator: Some("Exists".into()),
effect: Some("NoSchedule".into()),
..Default::default()
}],
node_selector: Some({
let mut m = std::collections::BTreeMap::new();
m.insert("kubernetes.io/os".into(), "linux".into());
m
}),
};
let pool = KafkaNodePool::new(
"brokers",
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: Some(template),
storage: None,
},
);
let json = serde_json::to_string(&pool).unwrap();
assert!(json.contains("\"team\":\"platform\""), "labels: {json}");
assert!(json.contains("\"dedicated\""), "tolerations: {json}");
assert!(json.contains("\"nodeSelector\""), "node_selector: {json}");
let back: KafkaNodePool = serde_json::from_str(&json).unwrap();
assert!(back.spec == pool.spec);
}
#[test]
fn storage_ephemeral_round_trips_through_json() {
let pool = KafkaNodePool::new(
"brokers",
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: Some(Storage::Ephemeral),
},
);
let json = serde_json::to_string(&pool).unwrap();
assert!(
json.contains("\"storage\":{\"type\":\"Ephemeral\"}"),
"expected flat tagged shape, got: {json}"
);
let back: KafkaNodePool = serde_json::from_str(&json).unwrap();
assert!(back.spec == pool.spec);
}
#[test]
fn storage_persistent_claim_round_trips_through_json() {
let pool = KafkaNodePool::new(
"brokers",
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "10Gi".into(),
class: Some("fast-ssd".into()),
delete_claim: true,
})),
},
);
let json = serde_json::to_string(&pool).unwrap();
assert!(json.contains("\"type\":\"PersistentClaim\""), "got: {json}");
assert!(json.contains("\"size\":\"10Gi\""), "got: {json}");
assert!(json.contains("\"class\":\"fast-ssd\""), "got: {json}");
assert!(json.contains("\"deleteClaim\":true"), "got: {json}");
let back: KafkaNodePool = serde_json::from_str(&json).unwrap();
assert!(back.spec == pool.spec);
}
#[test]
fn spec_defaults_storage_to_none() {
let json = r#"{"roles":["Controller","Broker"],"nodeIdStart":0}"#;
let spec: KafkaNodePoolSpec = serde_json::from_str(json).unwrap();
assert!(spec.storage.is_none());
}
#[test]
fn storage_jbod_round_trips_through_json() {
let pool = KafkaNodePool::new(
"brokers",
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: Some(Storage::Jbod(JbodSpec {
volumes: vec![
JbodVolume {
id: 0,
size: "10Gi".into(),
class: None,
},
JbodVolume {
id: 1,
size: "20Gi".into(),
class: Some("fast-ssd".into()),
},
],
delete_claim: true,
})),
},
);
let json = serde_json::to_string(&pool).unwrap();
assert!(json.contains("\"type\":\"Jbod\""), "got: {json}");
assert!(json.contains("\"volumes\":["), "got: {json}");
assert!(json.contains("\"id\":0"), "got: {json}");
assert!(json.contains("\"size\":\"20Gi\""), "got: {json}");
assert!(json.contains("\"class\":\"fast-ssd\""), "got: {json}");
assert!(json.contains("\"deleteClaim\":true"), "got: {json}");
let back: KafkaNodePool = serde_json::from_str(&json).unwrap();
assert!(back.spec == pool.spec);
}
#[test]
fn storage_jbod_deserializes_flat_wire_shape() {
let json = r#"{
"roles":["Controller","Broker"],
"nodeIdStart":0,
"storage":{
"type":"Jbod",
"volumes":[{"id":0,"size":"1Gi"},{"id":1,"size":"1Gi"}]
}
}"#;
let spec: KafkaNodePoolSpec = serde_json::from_str(json).unwrap();
match spec.storage {
Some(Storage::Jbod(j)) => {
assert!(j.volumes.len() == 2);
assert!(j.volumes[0].id == 0);
assert!(!j.delete_claim, "deleteClaim defaults to false");
}
other => panic!("expected Jbod, got {other:?}"),
}
}
}