use std::collections::BTreeMap;
use k8s_openapi::api::core::v1::ResourceRequirements;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "KafkaGrpcGateway",
plural = "kafkagrpcgateways",
singular = "kafkagrpcgateway",
shortname = "kgg",
namespaced,
status = "KafkaGrpcGatewayStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaGrpcGatewaySpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replicas: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resources: Option<ResourceRequirements>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dedup: Option<DedupSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tls: Option<GatewayTlsSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub authz: Option<GatewayAuthzSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub webhooks: Vec<InboundWebhookSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub outbound_subscriptions: Vec<OutboundSubscriptionSpec>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub allowed_targets: Vec<AllowedTargetSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub telemetry: Option<TelemetrySpec>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct DedupSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub topic: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub partitions: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub window_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub txn_id_prefix: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct GatewayTlsSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_auth: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub validity_days: Option<u32>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct GatewayAuthzSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mode: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub super_users: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub acl_refresh_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bearer: Option<GatewayBearerSpec>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct GatewayBearerSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mode: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub principal_claim: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct InboundWebhookSpec {
pub name: String,
pub target_topic: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub principal: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signature_header: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signature_encoding: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signature_prefix: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp_header: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp_tolerance_secs: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub idempotency_source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key_source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_body_bytes: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub secret_ref: Option<SecretKeyRef>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct OutboundSubscriptionSpec {
pub name: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub source_topics: Vec<String>,
pub target_url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dead_letter_topic: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_attempts: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub base_backoff_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_backoff_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_timeout_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filter: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub headers: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signing_secret_ref: Option<SecretKeyRef>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct AllowedTargetSpec {
pub scheme: String,
pub host: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SecretKeyRef {
pub name: String,
pub key: String,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TelemetrySpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub otlp_endpoint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub otlp_protocol: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sample_ratio: Option<f64>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaGrpcGatewayStatus {
#[serde(default)]
pub conditions: Vec<crate::crd::KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub observed_generation: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ready_replicas: Option<i32>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = KafkaGrpcGateway::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "KafkaGrpcGateway");
assert!(crd.spec.names.plural == "kafkagrpcgateways");
assert!(
crd.spec
.names
.short_names
.as_ref()
.is_some_and(|v| v.contains(&"kgg".to_string())),
"expected shortname `kgg`",
);
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn minimal_spec_parses() {
let json = r"{}";
let spec: KafkaGrpcGatewaySpec = serde_json::from_str(json).unwrap();
assert!(spec.replicas.is_none());
assert!(spec.image.is_none());
assert!(spec.webhooks.is_empty());
assert!(spec.outbound_subscriptions.is_empty());
}
#[test]
fn spec_round_trips_through_json() {
let gw = KafkaGrpcGateway::new(
"my-gateway",
KafkaGrpcGatewaySpec {
replicas: Some(2),
image: Some("ghcr.io/robot-head/crabka-grpc-gateway:latest".into()),
resources: None,
dedup: Some(DedupSpec {
topic: Some("my-gateway-dedup".into()),
partitions: Some(16),
window_ms: Some(86_400_000),
txn_id_prefix: Some("gw".into()),
}),
tls: Some(GatewayTlsSpec {
client_auth: Some("required".into()),
validity_days: Some(365),
}),
authz: Some(GatewayAuthzSpec {
mode: Some("simple".into()),
super_users: vec!["User:admin".into()],
acl_refresh_secs: Some(60),
bearer: Some(GatewayBearerSpec {
mode: Some("off".into()),
principal_claim: None,
}),
}),
webhooks: vec![InboundWebhookSpec {
name: "orders".into(),
target_topic: "raw-orders".into(),
principal: Some("User:webhook-producer".into()),
signature_header: Some("X-Hub-Signature-256".into()),
signature_encoding: Some("hex".into()),
signature_prefix: Some("sha256=".into()),
timestamp_header: None,
timestamp_tolerance_secs: Some(300),
idempotency_source: Some("header:X-Idempotency-Key".into()),
key_source: None,
max_body_bytes: Some(1_048_576),
secret_ref: Some(SecretKeyRef {
name: "orders-webhook-secret".into(),
key: "hmac-key".into(),
}),
}],
outbound_subscriptions: vec![OutboundSubscriptionSpec {
name: "processed-orders".into(),
source_topics: vec!["processed-orders".into()],
target_url: "https://example.com/hook".into(),
dead_letter_topic: Some("failed-deliveries".into()),
max_attempts: Some(5),
base_backoff_ms: Some(1000),
max_backoff_ms: Some(30_000),
request_timeout_ms: Some(10_000),
filter: None,
headers: BTreeMap::from([(
"Authorization".to_string(),
"Bearer token".to_string(),
)]),
signing_secret_ref: None,
}],
allowed_targets: vec![AllowedTargetSpec {
scheme: "https".into(),
host: "example.com".into(),
}],
telemetry: Some(TelemetrySpec {
otlp_endpoint: Some("http://otel:4317".into()),
otlp_protocol: Some("grpc".into()),
sample_ratio: Some(1.0),
}),
},
);
let json = serde_json::to_string(&gw).unwrap();
assert!(json.contains("\"replicas\":2"), "got: {json}");
assert!(
json.contains("\"targetTopic\":\"raw-orders\""),
"got: {json}"
);
assert!(
json.contains("\"targetUrl\":\"https://example.com/hook\""),
"got: {json}"
);
assert!(
json.contains("\"otlpEndpoint\":\"http://otel:4317\""),
"got: {json}"
);
let back: KafkaGrpcGateway = serde_json::from_str(&json).unwrap();
assert!(back.spec == gw.spec);
}
#[test]
fn spec_omits_empty_optional_fields() {
let spec = KafkaGrpcGatewaySpec {
replicas: None,
image: None,
resources: None,
dedup: None,
tls: None,
authz: None,
webhooks: vec![],
outbound_subscriptions: vec![],
allowed_targets: vec![],
telemetry: None,
};
let j = serde_json::to_string(&spec).unwrap();
assert!(!j.contains("replicas"), "got: {j}");
assert!(!j.contains("image"), "got: {j}");
assert!(!j.contains("webhooks"), "got: {j}");
assert!(!j.contains("outboundSubscriptions"), "got: {j}");
assert!(!j.contains("telemetry"), "got: {j}");
}
#[test]
fn status_omits_optional_fields_when_unset() {
let status = KafkaGrpcGatewayStatus::default();
let j = serde_json::to_string(&status).unwrap();
assert!(!j.contains("observedGeneration"), "got: {j}");
assert!(!j.contains("readyReplicas"), "got: {j}");
}
#[test]
fn outbound_subscription_headers_round_trip() {
let sub = OutboundSubscriptionSpec {
name: "sub1".into(),
source_topics: vec!["topic-a".into()],
target_url: "https://example.com/hook".into(),
dead_letter_topic: None,
max_attempts: None,
base_backoff_ms: None,
max_backoff_ms: None,
request_timeout_ms: None,
filter: None,
headers: BTreeMap::from([
("X-Tenant".to_string(), "acme".to_string()),
("Content-Type".to_string(), "application/json".to_string()),
]),
signing_secret_ref: None,
};
let j = serde_json::to_string(&sub).unwrap();
assert!(j.contains("\"X-Tenant\":\"acme\""), "got: {j}");
let back: OutboundSubscriptionSpec = serde_json::from_str(&j).unwrap();
assert!(back == sub);
}
#[test]
fn secret_key_ref_round_trips() {
let r = SecretKeyRef {
name: "my-secret".into(),
key: "hmac-key".into(),
};
let j = serde_json::to_string(&r).unwrap();
assert!(j == r#"{"name":"my-secret","key":"hmac-key"}"#, "got: {j}");
let back: SecretKeyRef = serde_json::from_str(&j).unwrap();
assert!(back == r);
}
}