use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "KafkaRebalance",
plural = "kafkarebalances",
singular = "kafkarebalance",
shortname = "kr",
namespaced,
status = "KafkaRebalanceStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaRebalanceSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub goals: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1))]
pub throttle_bytes_per_sec: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct OptimizationResult {
#[serde(default)]
pub replica_movements: i32,
#[serde(default)]
pub leader_movements: i32,
#[serde(default)]
pub max_replicas_before: i32,
#[serde(default)]
pub max_replicas_after: i32,
#[serde(default)]
pub max_leaders_before: i32,
#[serde(default)]
pub max_leaders_after: i32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub goals: Vec<String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaRebalanceStatus {
#[serde(default)]
pub conditions: Vec<crate::crd::KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub observed_generation: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub optimization_result: Option<OptimizationResult>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = KafkaRebalance::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "KafkaRebalance");
assert!(crd.spec.names.plural == "kafkarebalances");
assert!(
crd.spec
.names
.short_names
.as_ref()
.is_some_and(|v| v.contains(&"kr".to_string())),
"expected shortname `kr`",
);
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn spec_round_trips_through_json() {
let kr = KafkaRebalance::new(
"demo-rebalance",
KafkaRebalanceSpec {
goals: Some(vec!["RackAware".into(), "ReplicaDistribution".into()]),
throttle_bytes_per_sec: Some(10_000_000),
endpoint: Some("http://r.kafka.svc:9300".into()),
},
);
let json = serde_json::to_string(&kr).unwrap();
assert!(json.contains("\"goals\":[\"RackAware\""), "got: {json}");
assert!(
json.contains("\"throttleBytesPerSec\":10000000"),
"got: {json}"
);
assert!(
json.contains("\"endpoint\":\"http://r.kafka.svc:9300\""),
"got: {json}"
);
let back: KafkaRebalance = serde_json::from_str(&json).unwrap();
assert!(back.spec == kr.spec);
}
#[test]
fn empty_spec_parses_and_omits_optionals() {
let spec: KafkaRebalanceSpec = serde_json::from_str("{}").unwrap();
assert!(spec.goals.is_none());
assert!(spec.throttle_bytes_per_sec.is_none());
assert!(spec.endpoint.is_none());
let j = serde_json::to_string(&spec).unwrap();
assert!(j == "{}", "all-default spec must serialize to empty object");
}
#[test]
fn status_omits_optional_fields_when_none() {
let status = KafkaRebalanceStatus {
conditions: vec![],
observed_generation: Some(3),
session_id: None,
optimization_result: None,
};
let j = serde_json::to_string(&status).unwrap();
assert!(!j.contains("sessionId"), "got: {j}");
assert!(!j.contains("optimizationResult"), "got: {j}");
assert!(j.contains("\"observedGeneration\":3"), "got: {j}");
}
#[test]
fn optimization_result_defaults_to_zeroes() {
let r: OptimizationResult = serde_json::from_str("{}").unwrap();
assert!(r == OptimizationResult::default());
assert!(r.replica_movements == 0);
assert!(r.goals.is_empty());
}
}