Skip to main content

crabka_operator/controller/
common.rs

1//! Shared helpers across the `Kafka` and `KafkaNodePool` reconcilers.
2//!
3//! The `Kafka`-owned cluster-level objects (`Service`, `ConfigMap`,
4//! cluster-id `Secret`) live here because both reconcilers need to refer
5//! to their names (the pool reconciler reads the Secret; the parent
6//! reconciler renders+applies them). The status-derivation helper, the
7//! SSA / merge-patch wrappers, and the labels / owner-ref helpers are
8//! shared verbatim.
9
10use std::collections::BTreeMap;
11use std::fmt::Debug;
12
13use k8s_openapi::ByteString;
14use k8s_openapi::api::apps::v1::StatefulSet;
15use k8s_openapi::api::core::v1::{ConfigMap, Secret, Service};
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
17use kube::Resource;
18use kube::api::{Api, DynamicObject, Patch, PatchParams, PostParams};
19use kube::core::{ApiResource, GroupVersionKind};
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use serde_json::json;
23use uuid::Uuid;
24
25use crate::crd::{Kafka, KafkaCondition};
26
27pub(crate) const FIELD_MANAGER: &str = "crabka-operator";
28
29pub(crate) const BROKER_PORT: i32 = 9092;
30pub(crate) const APP_LABEL: &str = "crabka-broker";
31pub(crate) const DEFAULT_BROKER_IMAGE: &str = concat!(
32    "ghcr.io/robot-head/crabka-broker:",
33    env!("CARGO_PKG_VERSION")
34);
35
36/// Reconcile-error surface shared by both reconcilers.
37#[derive(Debug, thiserror::Error)]
38pub enum ReconcileError {
39    #[error("kube error: {0}")]
40    Kube(#[from] kube::Error),
41    #[error("resource missing uid (not yet admitted)")]
42    MissingUid,
43    #[error("serde error: {0}")]
44    Serde(#[from] serde_json::Error),
45    #[error("spec.replicas={0} is unsupported (only 1 allowed)")]
46    UnsupportedReplicas(i32),
47    #[error("cluster-id secret malformed: {0}")]
48    MalformedSecret(String),
49    #[error("metricsConfig: podMonitor and serviceMonitor are mutually exclusive")]
50    MetricsMutuallyExclusive,
51    #[error("monitoring.coreos.com/v1 is not served by the API server")]
52    PrometheusOperatorCrdsMissing,
53    #[error("malformed input: {0}")]
54    Malformed(String),
55    #[error("CA: {0}")]
56    Ca(#[from] crabka_security::ca::CaError),
57    #[error("cert parse: {0}")]
58    CertParse(String),
59    #[error(
60        "BYO CA missing: {which} requires pre-existing Secret pair (generateCertificateAuthority=false)"
61    )]
62    ByoCaMissing { which: String },
63    #[allow(dead_code)] // reserved to surface BYO CA parse failures at reconcile time
64    #[error("BYO CA malformed: {which}: {reason}")]
65    ByoCaMalformed { which: String, reason: String },
66    #[error("CA Secret missing: {name}")]
67    CaSecretMissing { name: String },
68    #[error("oauth trust Secret '{0}' not found")]
69    MissingOauthTrustSecret(String),
70    #[error("oauth trust Secret '{secret}' has no key '{key}'")]
71    MissingOauthTrustKey { secret: String, key: String },
72    #[error("oauth trust Secret '{secret}' key '{key}' is empty")]
73    EmptyOauthTrustValue { secret: String, key: String },
74    /// An oauth listener's `accessTokenIsJwt` setting
75    /// disagrees with which mode-specific fields are set (JWT-mode
76    /// requires `jwksEndpointUri` and rejects introspection fields;
77    /// introspection-mode requires `introspectionEndpointUri` + `clientId`
78    /// + `clientSecret` and rejects `jwksEndpointUri`).
79    #[error("listener OAuth: {0}")]
80    InvalidListenerOauthAccessTokenIsJwt(String),
81    /// An oauth listener's `clientSecret.secretName` doesn't
82    /// exist in the cluster's namespace.
83    #[error("oauth introspection Secret '{0}' not found")]
84    MissingOauthIntrospectionSecret(String),
85    /// An oauth listener's `clientSecret.secretName` exists
86    /// but does not contain the named `key`.
87    #[error("oauth introspection Secret '{secret}' has no key '{key}'")]
88    MissingOauthIntrospectionKey { secret: String, key: String },
89    /// An oauth listener's `clientSecret` Secret + key both
90    /// exist but the value is zero bytes.
91    #[error("oauth introspection Secret '{secret}' key '{key}' is empty")]
92    EmptyOauthIntrospectionValue { secret: String, key: String },
93    /// `type: gssapi` listener references a keytab Secret that doesn't exist.
94    #[error("gssapi keytab Secret '{0}' not found")]
95    MissingGssapiKeytabSecret(String),
96    /// keytab Secret exists but lacks the referenced key.
97    #[error("gssapi keytab Secret '{secret}' has no key '{key}'")]
98    MissingGssapiKeytabKey { secret: String, key: String },
99    /// `spec.krb5ConfSecretRef` references a Secret that doesn't exist.
100    #[error("krb5.conf Secret '{0}' not found")]
101    MissingKrb5ConfSecret(String),
102    /// `spec.krb5ConfSecretRef` Secret exists but lacks the referenced key.
103    #[error("krb5.conf Secret {secret:?} is missing key {key:?}")]
104    MissingKrb5ConfKey { secret: String, key: String },
105    /// KIP-405: `spec.tieredStorage` failed shape
106    /// validation. Concrete cases: `type = "S3"` without `spec.tieredStorage.s3`,
107    /// `type = "Local"` with `spec.tieredStorage.s3` set, or an S3 spec
108    /// missing required `bucket` / `region`. The reconciler returns this
109    /// before rendering any `ConfigMap` so the broker pod never boots
110    /// against malformed `[remote_storage]` TOML.
111    #[error("tieredStorage: {0}")]
112    TieredStorageInvalid(String),
113
114    /// `spec.tracing` failed shape validation. Concrete
115    /// cases: `type = "Otlp"` without an `otlp` block; `otlp.endpoint`
116    /// empty; `sampleRatio` outside `[0.0, 1.0]`; `timeoutSecs = 0`.
117    /// The reconciler returns this before rendering any pod template
118    /// so the broker pod never boots with broken OTLP env vars.
119    #[error("tracing: {0}")]
120    TracingInvalid(String),
121}
122
123/// Build a Kubernetes-style condition with `lastTransitionTime` set to
124/// now (RFC3339, second precision, with `Z`).
125pub(crate) fn condition(type_: &str, status: &str, reason: &str, message: &str) -> KafkaCondition {
126    KafkaCondition {
127        type_: type_.into(),
128        status: status.into(),
129        reason: reason.into(),
130        message: message.into(),
131        last_transition_time: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
132    }
133}
134
135/// Server-side apply a typed object. Field manager is `crabka-operator`,
136/// force-takeover is on so we wrest fields back from any previous manager
137/// if any happen to linger. Object shape is stable across reconciles
138/// because renderers are pure functions of the owner.
139pub(crate) async fn apply_object<K>(api: &Api<K>, name: &str, obj: &K) -> Result<(), ReconcileError>
140where
141    K: Resource + Clone + Serialize + DeserializeOwned + Debug,
142{
143    let params = PatchParams {
144        field_manager: Some(FIELD_MANAGER.into()),
145        force: true,
146        ..Default::default()
147    };
148    api.patch(name, &params, &Patch::Apply(obj)).await?;
149    Ok(())
150}
151
152/// Server-side apply an arbitrary object that is not in `k8s-openapi` (e.g. an
153/// `OpenShift` `Route`), given its GVK + plural and a JSON body. Errors —
154/// including a 404 when the CRD's API is not served (a non-`OpenShift` cluster)
155/// — propagate to the caller.
156pub(crate) async fn apply_dynamic(
157    client: &kube::Client,
158    namespace: &str,
159    api_version: &str,
160    kind: &str,
161    plural: &str,
162    name: &str,
163    body: &serde_json::Value,
164) -> Result<(), ReconcileError> {
165    let (group, version) = api_version
166        .split_once('/')
167        .ok_or_else(|| ReconcileError::Malformed("apiVersion missing '/'".into()))?;
168    let gvk = GroupVersionKind::gvk(group, version, kind);
169    let ar = ApiResource::from_gvk_with_plural(&gvk, plural);
170    let api: Api<DynamicObject> = Api::namespaced_with(client.clone(), namespace, &ar);
171    let obj: DynamicObject = serde_json::from_value(body.clone())?;
172    let pp = PatchParams::apply(FIELD_MANAGER).force();
173    api.patch(name, &pp, &Patch::Apply(&obj)).await?;
174    Ok(())
175}
176
177/// Merge-patch the status subresource of a CR. Uses `Patch::Merge` so we
178/// only overwrite the fields we set rather than replacing the whole
179/// status (which would conflict with any future status writers). Generic
180/// over the parent resource `K` and status payload `S`.
181pub(crate) async fn patch_status<K, S>(
182    api: &Api<K>,
183    name: &str,
184    status: S,
185) -> Result<(), ReconcileError>
186where
187    K: Resource + Clone + Serialize + DeserializeOwned + Debug,
188    <K as Resource>::DynamicType: Default,
189    S: Serialize,
190{
191    let patch = json!({ "status": status });
192    let params = PatchParams {
193        field_manager: Some(FIELD_MANAGER.into()),
194        ..Default::default()
195    };
196    api.patch_status(name, &params, &Patch::Merge(&patch))
197        .await?;
198    Ok(())
199}
200
201/// Common labels for objects owned by a `Kafka`. When the object belongs
202/// to a specific pool (i.e. pod-level labels on a `StatefulSet`), pass
203/// `Some(<pool name>)`; cluster-level objects (`Service` / `ConfigMap` /
204/// `Secret`) pass `None`.
205pub(crate) fn common_labels(
206    kafka_name: &str,
207    kafka_version: &str,
208    pool: Option<&str>,
209) -> BTreeMap<String, String> {
210    let mut m = BTreeMap::new();
211    m.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
212    m.insert("app.kubernetes.io/instance".into(), kafka_name.into());
213    m.insert("app.kubernetes.io/version".into(), kafka_version.into());
214    m.insert(
215        "app.kubernetes.io/managed-by".into(),
216        "crabka-operator".into(),
217    );
218    if let Some(p) = pool {
219        m.insert("crabka.io/pool".into(), p.into());
220    }
221    m
222}
223
224/// Generic owner-reference builder. Works for any CR (`Kafka`,
225/// `KafkaNodePool`) whose `DynamicType = ()`. Reads `apiVersion` and
226/// `kind` from the trait, name from the metadata.
227pub(crate) fn owner_ref<T>(obj: &T) -> Result<OwnerReference, ReconcileError>
228where
229    T: Resource<DynamicType = ()>,
230{
231    let uid = obj
232        .meta()
233        .uid
234        .as_deref()
235        .ok_or(ReconcileError::MissingUid)?;
236    Ok(OwnerReference {
237        api_version: T::api_version(&()).to_string(),
238        kind: T::kind(&()).to_string(),
239        name: obj.meta().name.clone().unwrap_or_default(),
240        uid: uid.to_string(),
241        controller: Some(true),
242        block_owner_deletion: Some(true),
243    })
244}
245
246/// Render the cluster-level headless `Service`. Owner-ref'd to the
247/// parent `Kafka`. Selector matches every pool's pods via the shared
248/// `app.kubernetes.io/instance` + `app.kubernetes.io/name` labels.
249pub(crate) fn render_service(owner: &Kafka) -> Result<Service, ReconcileError> {
250    let name = owner.meta().name.clone().unwrap_or_default();
251    let labels = common_labels(&name, &owner.spec.kafka_version, None);
252    let mut selector: BTreeMap<String, String> = BTreeMap::new();
253    selector.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
254    selector.insert("app.kubernetes.io/instance".into(), name.clone());
255
256    let svc: Service = serde_json::from_value(json!({
257        "metadata": {
258            "name": format!("{name}-broker-headless"),
259            "namespace": owner.meta().namespace.clone(),
260            "labels": labels,
261            "ownerReferences": [owner_ref::<Kafka>(owner)?],
262        },
263        "spec": {
264            "clusterIP": "None",
265            "selector": selector,
266            "ports": [{
267                "name": "kafka-internal",
268                "port": BROKER_PORT,
269                "protocol": "TCP",
270                "targetPort": BROKER_PORT,
271            }],
272        }
273    }))?;
274    Ok(svc)
275}
276
277/// Render the cluster-level `ConfigMap`. Owner-ref'd to the parent
278/// `Kafka`. Emits one `broker-{id}.toml` key per entry in
279/// `addresses_per_broker`, generated by
280/// [`crate::controller::listeners::render_broker_toml`].
281#[allow(clippy::too_many_arguments)] // each arg is an independent operator-owned render input
282pub(crate) fn render_configmap(
283    owner: &Kafka,
284    listeners: &[crate::crd::Listener],
285    addresses_per_broker: &std::collections::BTreeMap<
286        i32,
287        std::collections::BTreeMap<String, crate::controller::listeners::AdvertisedAddress>,
288    >,
289    inter_broker_listener_name: &str,
290    tls_per_broker: Option<
291        &std::collections::BTreeMap<i32, crate::controller::listeners::BrokerTlsRender>,
292    >,
293    clients_ca_path: Option<&str>,
294    logging_filter: Option<&str>,
295) -> Result<ConfigMap, ReconcileError> {
296    let name = owner.meta().name.clone().unwrap_or_default();
297    let labels = common_labels(&name, &owner.spec.kafka_version, None);
298
299    let mut data = BTreeMap::new();
300    // Cluster-wide `RUST_LOG` filter, referenced by each broker
301    // pod's `RUST_LOG` env via `configMapKeyRef` (see `render_broker_container`).
302    if let Some(filter) = logging_filter {
303        data.insert("rust.log".to_string(), filter.to_string());
304    }
305    // `metadata.version` is finalized via the bootstrap-seeded feature
306    // record (`crabka format --release-version`), not the broker config —
307    // so it is intentionally not rendered here. An explicit
308    // `spec.metadataVersion` pin still rolls the cluster via the config
309    // hash (see `combined_config_hash`), which is a separate channel.
310    let server_properties = owner.spec.config.clone().unwrap_or_default();
311    // Surface delegation-token enablement to the per-broker
312    // renderer. The `super_users = ["ANONYMOUS"]`
313    // top-level emit is folded into the `[authorization]` block — passing this
314    // flag still drives the auto-injected `[authorization]` shape (or the
315    // ANONYMOUS-merge into a user-authored authorization).
316    let delegation_token_enabled = owner.spec.delegation_token.is_some();
317    // Optional broker authorizer config. `None` ⇒ broker
318    // defaults to AllowAll (or, with delegation tokens enabled, gets the
319    // auto-injected `simple + ANONYMOUS` block — see
320    // `render_broker_toml`).
321    let authorization = owner.spec.authorization.as_ref();
322    // Thread `Kafka.spec.tieredStorage` into each broker's
323    // TOML so the broker-wide `[remote_storage]` block (and the matching
324    // `tier-storage` pod volume) light up together.
325    let tiered_storage = owner.spec.tiered_storage.as_ref();
326    let inter_broker_kerberos = owner.spec.inter_broker_kerberos.as_ref();
327    for (broker_id, addrs) in addresses_per_broker {
328        let tls_for_broker = tls_per_broker.and_then(|m| m.get(broker_id));
329        let toml = crate::controller::listeners::render_broker_toml(
330            *broker_id,
331            listeners,
332            addrs,
333            inter_broker_listener_name,
334            &server_properties,
335            tls_for_broker,
336            clients_ca_path,
337            delegation_token_enabled,
338            authorization,
339            tiered_storage,
340            inter_broker_kerberos,
341        );
342        data.insert(format!("broker-{broker_id}.toml"), toml);
343    }
344
345    Ok(ConfigMap {
346        metadata: ObjectMeta {
347            name: Some(format!("{name}-broker-config")),
348            namespace: owner.meta().namespace.clone(),
349            labels: Some(labels),
350            owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
351            ..Default::default()
352        },
353        data: Some(data),
354        ..Default::default()
355    })
356}
357
358/// Render the cluster-id `Secret`. Owner-ref'd to the parent `Kafka`.
359/// The `clusterId` value is the canonical hyphenated UUID encoded as
360/// UTF-8 bytes (k8s wraps with base64 on the wire).
361pub(crate) fn render_secret(owner: &Kafka, cluster_id: Uuid) -> Result<Secret, ReconcileError> {
362    let name = owner.meta().name.clone().unwrap_or_default();
363    let labels = common_labels(&name, &owner.spec.kafka_version, None);
364
365    let mut data = BTreeMap::new();
366    data.insert(
367        "clusterId".to_string(),
368        ByteString(cluster_id.to_string().into_bytes()),
369    );
370
371    Ok(Secret {
372        metadata: ObjectMeta {
373            name: Some(format!("{name}-cluster-id")),
374            namespace: owner.meta().namespace.clone(),
375            labels: Some(labels),
376            owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
377            ..Default::default()
378        },
379        type_: Some("Opaque".into()),
380        data: Some(data),
381        ..Default::default()
382    })
383}
384
385/// Read `data.clusterId` from a Secret, decode the bytes as UTF-8, and
386/// parse the hyphenated UUID. Returns `MalformedSecret` rather than
387/// panicking if the Secret was hand-edited or otherwise unparsable —
388/// the operator should not crash on bad operator input.
389pub(crate) fn uuid_from_secret(secret: &Secret) -> Result<Uuid, ReconcileError> {
390    let data = secret
391        .data
392        .as_ref()
393        .ok_or_else(|| ReconcileError::MalformedSecret("Secret.data is empty".into()))?;
394    let bytes = &data
395        .get("clusterId")
396        .ok_or_else(|| ReconcileError::MalformedSecret("missing clusterId key".into()))?
397        .0;
398    let s = std::str::from_utf8(bytes)
399        .map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not UTF-8: {e}")))?;
400    Uuid::parse_str(s)
401        .map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not a UUID: {e}")))
402}
403
404/// Read a PEM string from a Secret's data field. Returns `None` if the key is
405/// absent, the data map is missing, or the bytes are not valid UTF-8.
406pub(crate) fn read_pem_key(secret: &Secret, key: &str) -> Option<String> {
407    let data = secret.data.as_ref()?;
408    let bytes = &data.get(key)?.0;
409    String::from_utf8(bytes.clone()).ok()
410}
411
412/// Get-or-create the cluster-id Secret. Returns the parsed UUID.
413///
414/// The Secret is created with `Patch::Apply` semantics-equivalent
415/// `POST` (i.e. a plain create) so that an existing Secret is never
416/// overwritten — the cluster id is a one-shot value that must never
417/// change. If the Secret already exists, we read its `clusterId` back.
418pub(crate) async fn ensure_cluster_id_secret(
419    secret_api: &Api<Secret>,
420    parent: &Kafka,
421) -> Result<Uuid, ReconcileError> {
422    let name = parent.meta().name.clone().unwrap_or_default();
423    let secret_name = format!("{name}-cluster-id");
424    if let Some(existing) = secret_api.get_opt(&secret_name).await? {
425        return uuid_from_secret(&existing);
426    }
427    let id = Uuid::new_v4();
428    let secret = render_secret(parent, id)?;
429    secret_api.create(&PostParams::default(), &secret).await?;
430    Ok(id)
431}
432
433/// Pure helper deriving the status fields from the live `StatefulSet`.
434/// Returns `(replicas, readyReplicas, reason, message)`. The caller maps
435/// `reason == "Available"` to `Ready=True`, anything else to `Ready=False`.
436pub(crate) fn derive_status(
437    live: Option<&StatefulSet>,
438    desired_replicas: i32,
439) -> (Option<i32>, Option<i32>, &'static str, String) {
440    let (replicas, ready_replicas) = live
441        .and_then(|s| s.status.as_ref())
442        .map_or((None, None), |st| (Some(st.replicas), st.ready_replicas));
443
444    let ready_count = ready_replicas.unwrap_or(0);
445    if ready_count == desired_replicas {
446        (
447            replicas,
448            ready_replicas,
449            "Available",
450            format!("{desired_replicas} broker(s) ready"),
451        )
452    } else if ready_count == 0 {
453        (
454            replicas,
455            ready_replicas,
456            "NoBrokersReady",
457            format!("0/{desired_replicas} brokers ready"),
458        )
459    } else {
460        (
461            replicas,
462            ready_replicas,
463            "PartiallyReady",
464            format!("{ready_count}/{desired_replicas} brokers ready"),
465        )
466    }
467}
468
469/// Truncated SHA-256 hex digest (16 hex chars / 8 bytes of entropy)
470/// of the given content. Used to detect `Kafka.spec.config`
471/// changes that the K8s `StatefulSet` controller can't see directly.
472///
473/// The full sha256 is 64 hex chars, which exceeds the 63-char K8s
474/// label-value limit. 64 bits of entropy is more than enough for a
475/// drift detector — collisions for accidental config changes are
476/// astronomically unlikely.
477#[must_use]
478pub fn config_hash(content: &str) -> String {
479    use std::fmt::Write;
480
481    use sha2::{Digest, Sha256};
482    let mut h = Sha256::new();
483    h.update(content.as_bytes());
484    let digest = h.finalize();
485    let mut out = String::with_capacity(16);
486    for byte in digest.iter().take(8) {
487        write!(&mut out, "{byte:02x}").expect("writing to a String never fails");
488    }
489    out
490}
491
492/// Combined hash over user `spec.config`, the
493/// canonical listener intent, a `metrics_config.is_some()` bit, and
494/// the cluster CA cert PEM.
495/// Empty listeners produce empty intent and metrics-unset produces an
496/// empty third segment, so the combined hash is identical to the
497/// bare `config_hash(spec.config)` for an unchanged `spec.config` with neither listeners
498/// nor metrics.
499///
500/// The metrics segment is a coarse `metrics_enabled` bit, not a hash of
501/// the full `metrics_config` body — toggling `metricsConfig` on/off
502/// changes the broker pod template (adds/removes the metrics port + CLI
503/// flag) and so must trigger a pool reconcile (which re-renders the
504/// `StatefulSet`). Sub-field changes (interval, scrape labels) affect
505/// only the `PodMonitor`/`ServiceMonitor` objects, not the broker pod,
506/// and do not need a roll.
507///
508/// `cluster_ca_cert_pem` — when `Some`, the cluster CA cert PEM is
509/// included as a fourth segment. Rotating the cluster CA forces a
510/// cluster roll; leaf renewal does not (hot-reload handles it).
511///
512/// `metadata_version_pin` — when `Some`, an *explicit*
513/// `spec.metadataVersion` pin is included as a fifth segment, so changing
514/// the pin rolls the cluster. A *defaulted* metadata version is passed as
515/// `None` here (a binary bump already rolls via the pod-template image
516/// change), which preserves the empty-hash collapse.
517///
518/// `logging_filter` — when `Some`, the resolved `RUST_LOG`
519/// env-filter string is included as a sixth segment. The broker only re-reads
520/// `RUST_LOG` on restart, so a *value* change (not just on/off) must roll the
521/// cluster. `None` (logging unset, or external resolution failed) contributes
522/// an empty segment, preserving the empty-hash collapse.
523#[must_use]
524pub fn combined_config_hash(
525    spec: &crate::crd::KafkaSpec,
526    cluster_ca_cert_pem: Option<&str>,
527    metadata_version_pin: Option<&str>,
528    logging_filter: Option<&str>,
529) -> String {
530    let config_part = spec
531        .config
532        .as_ref()
533        .map(|m| {
534            let mut s = String::new();
535            for (k, v) in m {
536                s.push_str(k);
537                s.push('=');
538                s.push_str(v);
539                s.push('\n');
540            }
541            s
542        })
543        .unwrap_or_default();
544    let intent = crate::controller::listeners::canonical_listener_intent(
545        &spec.listeners,
546        spec.inter_broker_listener_name.as_deref(),
547    );
548    let metrics_part = if spec.metrics_config.is_some() {
549        "metrics=on"
550    } else {
551        ""
552    };
553    let ca_part = cluster_ca_cert_pem.unwrap_or("");
554    let metadata_part = metadata_version_pin.unwrap_or("");
555    let logging_part = logging_filter.unwrap_or("");
556    // Hash-collapse compatibility: when listeners, metricsConfig, the CA cert,
557    // an explicit metadataVersion pin, and logging are all absent, the hash
558    // collapses to `config_hash(config_part)` — byte-identical to the
559    // bare config hash for the same `spec.config`. This is what makes an
560    // in-place upgrade from a config-only cluster not trigger a hash-driven roll (the
561    // unavoidable template-change roll fires separately and once).
562    if intent.is_empty()
563        && metrics_part.is_empty()
564        && ca_part.is_empty()
565        && metadata_part.is_empty()
566        && logging_part.is_empty()
567    {
568        return config_hash(&config_part);
569    }
570    let mut buf = String::with_capacity(
571        config_part.len()
572            + 5
573            + intent.len()
574            + metrics_part.len()
575            + ca_part.len()
576            + metadata_part.len()
577            + logging_part.len(),
578    );
579    buf.push_str(&config_part);
580    buf.push('\x1F'); // ASCII unit separator
581    buf.push_str(&intent);
582    buf.push('\x1F');
583    buf.push_str(metrics_part);
584    buf.push('\x1F');
585    buf.push_str(ca_part);
586    buf.push('\x1F');
587    buf.push_str(metadata_part);
588    buf.push('\x1F');
589    buf.push_str(logging_part);
590    config_hash(&buf)
591}
592
593/// One pool's observed state, fed to [`plan_rollout`]. `current_hash` is
594/// the pool's `crabka.io/config-hash` label (`None` if never stamped);
595/// `ready` is whether the pool's single broker has reached Ready.
596#[derive(Debug, Clone, PartialEq, Eq)]
597pub(crate) struct PoolRolloutState {
598    pub name: String,
599    pub current_hash: Option<String>,
600    pub ready: bool,
601}
602
603/// Decide the config-hash to write to each pool for an ordered,
604/// one-node-at-a-time rollout. `pools` must be pre-sorted into the desired
605/// roll order (by `(node_id_start, name)`). Returns the target hash per
606/// pool name, in the same order.
607///
608/// - **Bring-up / recovery** — if any pool has no current hash, or there
609///   is more than one distinct *non-desired* hash among pools, every pool
610///   gets `desired` (parallel). This is required so a `KRaft` controller
611///   quorum can form: gating initial creation one-at-a-time would deadlock
612///   (a single controller can't reach Ready without quorum). Also the
613///   single-pool first-reconcile path.
614/// - **Steady state** — if every pool already carries `desired`, all stay
615///   `desired` (no-op).
616/// - **Established roll** — otherwise the cluster is uniform on one old
617///   hash (or mid-roll between `{old, desired}`) and transitioning. Walk
618///   pools in order; a pool is *converged* when it already carries
619///   `desired` AND is Ready. Advance the first non-converged pool to
620///   `desired`; every later pool keeps its current hash until the earlier
621///   pools converge.
622pub(crate) fn plan_rollout(pools: &[PoolRolloutState], desired: &str) -> Vec<(String, String)> {
623    let all_have_hash = pools.iter().all(|p| p.current_hash.is_some());
624    let distinct_non_desired: std::collections::BTreeSet<&str> = pools
625        .iter()
626        .filter_map(|p| p.current_hash.as_deref())
627        .filter(|h| *h != desired)
628        .collect();
629
630    // Bring-up / recovery / messy state → everyone gets `desired`.
631    if !all_have_hash || distinct_non_desired.len() > 1 {
632        return pools
633            .iter()
634            .map(|p| (p.name.clone(), desired.to_string()))
635            .collect();
636    }
637
638    // Established cluster: advance one pool at a time, gated on readiness.
639    let mut gate_open = true;
640    let mut out = Vec::with_capacity(pools.len());
641    for p in pools {
642        if gate_open {
643            let converged = p.current_hash.as_deref() == Some(desired) && p.ready;
644            // This pool advances to (or already holds) `desired`.
645            out.push((p.name.clone(), desired.to_string()));
646            if !converged {
647                // Hold every later pool at its current hash until this one
648                // converges.
649                gate_open = false;
650            }
651        } else {
652            // Keep the existing hash; `all_have_hash` guarantees `Some`.
653            let keep = p
654                .current_hash
655                .clone()
656                .unwrap_or_else(|| desired.to_string());
657            out.push((p.name.clone(), keep));
658        }
659    }
660    out
661}
662
663/// Parse a K8s `Quantity` string into a comparable byte count.
664///
665/// Accepts:
666/// - Binary suffixes: `Ki`, `Mi`, `Gi`, `Ti`, `Pi`, `Ei` (1 Ki = 1024).
667/// - Decimal suffixes: `K`, `M`, `G`, `T`, `P`, `E` (1 K = 1000).
668/// - Bare numbers (no suffix → bytes).
669/// - Integer or decimal mantissa (`1.5Gi`).
670///
671/// Rejects: scientific notation, negative numbers, zero, empty
672/// strings, or any value that doesn't match `<mantissa><suffix?>`.
673///
674/// Returns the byte count as `i128` (1.5 Pi fits with headroom for
675/// arithmetic). The result is only used for ordered comparison
676/// — we never round-trip back to a string, so sub-byte rounding from
677/// the `f64` intermediate is acceptable. The in-tree implementation
678/// is ~50 lines and saves a workspace dependency; no third-party
679/// Quantity parser is wired into Crabka yet.
680///
681/// # Errors
682///
683/// Returns a static `&str` describing the parse failure.
684#[allow(
685    clippy::cast_precision_loss,
686    clippy::cast_possible_truncation,
687    clippy::cast_sign_loss
688)]
689pub(crate) fn parse_quantity(s: &str) -> Result<i128, &'static str> {
690    if s.is_empty() {
691        return Err("empty quantity string");
692    }
693
694    let (mantissa_str, multiplier): (&str, i128) = if let Some(rest) = s.strip_suffix("Ki") {
695        (rest, 1_024)
696    } else if let Some(rest) = s.strip_suffix("Mi") {
697        (rest, 1_024_i128.pow(2))
698    } else if let Some(rest) = s.strip_suffix("Gi") {
699        (rest, 1_024_i128.pow(3))
700    } else if let Some(rest) = s.strip_suffix("Ti") {
701        (rest, 1_024_i128.pow(4))
702    } else if let Some(rest) = s.strip_suffix("Pi") {
703        (rest, 1_024_i128.pow(5))
704    } else if let Some(rest) = s.strip_suffix("Ei") {
705        (rest, 1_024_i128.pow(6))
706    } else if let Some(rest) = s.strip_suffix('K') {
707        (rest, 1_000)
708    } else if let Some(rest) = s.strip_suffix('M') {
709        (rest, 1_000_000)
710    } else if let Some(rest) = s.strip_suffix('G') {
711        (rest, 1_000_000_000)
712    } else if let Some(rest) = s.strip_suffix('T') {
713        (rest, 1_000_000_000_000)
714    } else if let Some(rest) = s.strip_suffix('P') {
715        (rest, 1_000_000_000_000_000)
716    } else if let Some(rest) = s.strip_suffix('E') {
717        (rest, 1_000_000_000_000_000_000)
718    } else {
719        (s, 1)
720    };
721
722    if mantissa_str.is_empty() {
723        return Err("missing numeric mantissa before suffix");
724    }
725    if mantissa_str.contains(['e', 'E']) {
726        return Err("scientific notation not supported");
727    }
728    if mantissa_str.starts_with('-') {
729        return Err("negative quantity rejected");
730    }
731
732    let mantissa: f64 = mantissa_str
733        .parse()
734        .map_err(|_| "mantissa is not a valid number")?;
735    if !mantissa.is_finite() {
736        return Err("mantissa is not finite");
737    }
738    if mantissa <= 0.0 {
739        return Err("quantity must be strictly positive");
740    }
741
742    let bytes = mantissa * multiplier as f64;
743    if bytes > i128::MAX as f64 {
744        return Err("quantity overflows i128");
745    }
746    Ok(bytes as i128)
747}
748
749#[cfg(test)]
750mod config_hash_tests {
751    use super::*;
752    use assert2::assert;
753
754    #[test]
755    fn config_hash_is_truncated_sha256_hex() {
756        // First 16 hex chars (8 bytes) of sha256("hello"):
757        //   2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
758        //   ^^^^^^^^^^^^^^^^
759        let h = config_hash("hello");
760        assert!(h == "2cf24dba5fb0a30e");
761        assert!(h.len() == 16, "must fit within K8s 63-char label limit");
762    }
763
764    #[test]
765    fn config_hash_empty_string() {
766        // First 16 hex chars of sha256("").
767        let h = config_hash("");
768        assert!(h == "e3b0c44298fc1c14");
769    }
770
771    #[test]
772    fn config_hash_fits_in_kubernetes_label_value() {
773        // K8s label values are limited to 63 characters. Our truncated
774        // hash must always fit; this test guards against future widening.
775        let h = config_hash("any content at all");
776        assert!(h.len() <= 63, "hash {h} exceeds K8s label limit");
777    }
778
779    #[test]
780    fn combined_hash_unchanged_when_listeners_empty() {
781        use crate::crd::KafkaSpec;
782
783        let spec_a = KafkaSpec {
784            kafka_version: "0.1.1".into(),
785            metadata_version: None,
786            config: Some({
787                let mut m = std::collections::BTreeMap::new();
788                m.insert("log.retention.hours".into(), "24".into());
789                m
790            }),
791            listeners: vec![],
792            inter_broker_listener_name: None,
793            metrics_config: None,
794            network_policy: None,
795            cluster_ca: None,
796            clients_ca: None,
797            logging: None,
798            delegation_token: None,
799            authorization: None,
800            tiered_storage: None,
801            inter_broker_kerberos: None,
802            krb5_conf_secret_ref: None,
803            tracing: None,
804        };
805        let h = combined_config_hash(&spec_a, None, None, None);
806        let h_again = combined_config_hash(&spec_a, None, None, None);
807        assert!(h == h_again);
808
809        // Hash-collapse compat: the hash for empty listeners + no metrics MUST
810        // equal `config_hash(serialized broker-properties)`. That's what
811        // lets an in-place config-only upgrade avoid a
812        // hash-driven roll (the e2e job `kind-upgrade` asserts this
813        // against a real config-only cluster).
814        let config_only_form = "log.retention.hours=24\n";
815        assert!(
816            h == config_hash(config_only_form),
817            "combined hash for empty listeners must equal config_hash(spec.config)"
818        );
819
820        let mut spec_b = spec_a.clone();
821        spec_b.listeners = vec![crate::controller::listeners::synthesized_default_listener()];
822        spec_b.inter_broker_listener_name = Some("PLAIN".into());
823        let h_with_listener = combined_config_hash(&spec_b, None, None, None);
824        assert!(
825            h != h_with_listener,
826            "non-empty listener intent must change hash"
827        );
828    }
829
830    #[test]
831    fn combined_hash_flips_when_metrics_config_toggles() {
832        use crate::crd::{KafkaSpec, MetricsConfig, PodMonitorSpec};
833
834        let spec_off = KafkaSpec {
835            kafka_version: "0.1.1".into(),
836            metadata_version: None,
837            config: None,
838            listeners: vec![],
839            inter_broker_listener_name: None,
840            metrics_config: None,
841            network_policy: None,
842            cluster_ca: None,
843            clients_ca: None,
844            logging: None,
845            delegation_token: None,
846            authorization: None,
847            tiered_storage: None,
848            inter_broker_kerberos: None,
849            krb5_conf_secret_ref: None,
850            tracing: None,
851        };
852        let h_off = combined_config_hash(&spec_off, None, None, None);
853
854        let mut spec_on = spec_off.clone();
855        spec_on.metrics_config = Some(MetricsConfig {
856            pod_monitor: Some(PodMonitorSpec::default()),
857            ..Default::default()
858        });
859        let h_on = combined_config_hash(&spec_on, None, None, None);
860        assert!(
861            h_off != h_on,
862            "enabling metrics_config must bump the hash (triggers pool reconcile + StatefulSet re-render)"
863        );
864
865        // Toggling sub-fields (interval, labels) does NOT change the
866        // hash — those only affect the PodMonitor/ServiceMonitor body,
867        // not the broker pod template, so they must not trigger a roll.
868        let mut spec_on_diff_interval = spec_on.clone();
869        if let Some(cfg) = spec_on_diff_interval.metrics_config.as_mut() {
870            cfg.pod_monitor = Some(PodMonitorSpec {
871                interval: Some("60s".into()),
872                ..Default::default()
873            });
874        }
875        assert!(
876            h_on == combined_config_hash(&spec_on_diff_interval, None, None, None),
877            "PodMonitor interval change must NOT roll the broker pod"
878        );
879    }
880
881    #[test]
882    fn combined_hash_changes_when_cluster_ca_cert_changes() {
883        let spec = crate::crd::KafkaSpec {
884            kafka_version: "0.1.1".into(),
885            metadata_version: None,
886            config: None,
887            listeners: vec![],
888            inter_broker_listener_name: None,
889            metrics_config: None,
890            network_policy: None,
891            cluster_ca: None,
892            clients_ca: None,
893            logging: None,
894            delegation_token: None,
895            authorization: None,
896            tiered_storage: None,
897            inter_broker_kerberos: None,
898            krb5_conf_secret_ref: None,
899            tracing: None,
900        };
901        let h_none = combined_config_hash(&spec, None, None, None);
902        let h_a = combined_config_hash(
903            &spec,
904            Some("-----BEGIN CERTIFICATE-----\nA\n-----END CERTIFICATE-----\n"),
905            None,
906            None,
907        );
908        let h_b = combined_config_hash(
909            &spec,
910            Some("-----BEGIN CERTIFICATE-----\nB\n-----END CERTIFICATE-----\n"),
911            None,
912            None,
913        );
914        assert!(h_none != h_a, "absent vs present CA must differ");
915        assert!(h_a != h_b, "different CA PEM must differ");
916    }
917
918    #[test]
919    fn combined_hash_stable_under_broker_keystore_changes() {
920        // The keystore Secret's contents are never inputs to
921        // combined_config_hash (hot-reload handles leaf renewal).
922        // This test guards against a future regression where someone wires
923        // a keystore digest into the hash.
924        let spec = crate::crd::KafkaSpec {
925            kafka_version: "0.1.1".into(),
926            metadata_version: None,
927            config: None,
928            listeners: vec![],
929            inter_broker_listener_name: None,
930            metrics_config: None,
931            network_policy: None,
932            cluster_ca: None,
933            clients_ca: None,
934            logging: None,
935            delegation_token: None,
936            authorization: None,
937            tiered_storage: None,
938            inter_broker_kerberos: None,
939            krb5_conf_secret_ref: None,
940            tracing: None,
941        };
942        let h1 = combined_config_hash(&spec, Some("ca-pem"), None, None);
943        let h2 = combined_config_hash(&spec, Some("ca-pem"), None, None);
944        assert!(h1 == h2);
945    }
946
947    #[test]
948    fn configmap_has_one_toml_key_per_broker() {
949        use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
950        use crate::crd::KafkaSpec;
951
952        let mut k = Kafka::new(
953            "demo",
954            KafkaSpec {
955                kafka_version: "0.1.1".into(),
956                metadata_version: None,
957                config: None,
958                listeners: vec![],
959                inter_broker_listener_name: None,
960                metrics_config: None,
961                network_policy: None,
962                cluster_ca: None,
963                clients_ca: None,
964                logging: None,
965                delegation_token: None,
966                authorization: None,
967                tiered_storage: None,
968                inter_broker_kerberos: None,
969                krb5_conf_secret_ref: None,
970                tracing: None,
971            },
972        );
973        k.meta_mut().namespace = Some("default".into());
974        k.meta_mut().uid = Some("uid".into());
975
976        let listeners = vec![synthesized_default_listener()];
977        let mut per_broker = std::collections::BTreeMap::new();
978        let mut addrs0 = std::collections::BTreeMap::new();
979        addrs0.insert(
980            "PLAIN".into(),
981            AdvertisedAddress {
982                host: "demo-0.svc".into(),
983                port: 9092,
984            },
985        );
986        let mut addrs1 = std::collections::BTreeMap::new();
987        addrs1.insert(
988            "PLAIN".into(),
989            AdvertisedAddress {
990                host: "demo-1.svc".into(),
991                port: 9092,
992            },
993        );
994        per_broker.insert(0i32, addrs0);
995        per_broker.insert(1i32, addrs1);
996
997        let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
998        let data = cm.data.unwrap();
999        assert!(data.contains_key("broker-0.toml"));
1000        assert!(data.contains_key("broker-1.toml"));
1001        assert!(data["broker-0.toml"].contains("demo-0.svc"));
1002        assert!(data["broker-1.toml"].contains("demo-1.svc"));
1003        // The old broker.env / broker.properties keys are dropped.
1004        assert!(!data.contains_key("broker.env"));
1005        assert!(!data.contains_key("broker.properties"));
1006    }
1007
1008    #[test]
1009    fn combined_hash_changes_when_metadata_version_pin_set() {
1010        use crate::crd::KafkaSpec;
1011
1012        let spec = KafkaSpec {
1013            kafka_version: "3.7.0".into(),
1014            metadata_version: None,
1015            config: None,
1016            listeners: vec![],
1017            inter_broker_listener_name: None,
1018            metrics_config: None,
1019            network_policy: None,
1020            cluster_ca: None,
1021            clients_ca: None,
1022            logging: None,
1023            delegation_token: None,
1024            authorization: None,
1025            tiered_storage: None,
1026            inter_broker_kerberos: None,
1027            krb5_conf_secret_ref: None,
1028            tracing: None,
1029        };
1030        // No explicit pin => hash collapse preserved (== config_hash of
1031        // the empty config part).
1032        let h_default = combined_config_hash(&spec, None, None, None);
1033        assert!(h_default == config_hash(""));
1034
1035        // An explicit pin enters the hash and changes it.
1036        let h_pin = combined_config_hash(&spec, None, Some("3.6"), None);
1037        assert!(h_default != h_pin, "explicit metadata pin must change hash");
1038        // A different pin differs again.
1039        let h_pin2 = combined_config_hash(&spec, None, Some("3.7"), None);
1040        assert!(h_pin != h_pin2, "different metadata pin must differ");
1041    }
1042
1043    #[test]
1044    fn configmap_never_injects_metadata_version_into_server_properties() {
1045        use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
1046        use crate::crd::KafkaSpec;
1047
1048        // Even with an explicit `spec.metadataVersion` pin, the rendered
1049        // broker config must not carry `metadata.version` — it is finalized
1050        // via the bootstrap feature record, not the config channel.
1051        let mut k = Kafka::new(
1052            "demo",
1053            KafkaSpec {
1054                kafka_version: "3.7.0".into(),
1055                metadata_version: Some("3.6".into()),
1056                config: None,
1057                listeners: vec![],
1058                inter_broker_listener_name: None,
1059                metrics_config: None,
1060                network_policy: None,
1061                cluster_ca: None,
1062                clients_ca: None,
1063                logging: None,
1064                delegation_token: None,
1065                authorization: None,
1066                tiered_storage: None,
1067                inter_broker_kerberos: None,
1068                krb5_conf_secret_ref: None,
1069                tracing: None,
1070            },
1071        );
1072        k.meta_mut().namespace = Some("default".into());
1073        k.meta_mut().uid = Some("uid".into());
1074
1075        let listeners = vec![synthesized_default_listener()];
1076        let mut per_broker = std::collections::BTreeMap::new();
1077        let mut addrs0 = std::collections::BTreeMap::new();
1078        addrs0.insert(
1079            "PLAIN".into(),
1080            AdvertisedAddress {
1081                host: "demo-0.svc".into(),
1082                port: 9092,
1083            },
1084        );
1085        per_broker.insert(0i32, addrs0);
1086
1087        let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
1088        let toml = &cm.data.unwrap()["broker-0.toml"];
1089        assert!(
1090            !toml.contains("metadata.version"),
1091            "metadata.version must never be injected into broker config, got:\n{toml}"
1092        );
1093    }
1094}
1095
1096#[cfg(test)]
1097mod rollout_tests {
1098    use super::{PoolRolloutState, plan_rollout};
1099    use assert2::assert;
1100
1101    fn st(name: &str, hash: Option<&str>, ready: bool) -> PoolRolloutState {
1102        PoolRolloutState {
1103            name: name.into(),
1104            current_hash: hash.map(str::to_string),
1105            ready,
1106        }
1107    }
1108
1109    fn targets(plan: &[(String, String)]) -> Vec<(&str, &str)> {
1110        plan.iter().map(|(n, h)| (n.as_str(), h.as_str())).collect()
1111    }
1112
1113    #[test]
1114    fn bring_up_all_get_desired_when_no_hash() {
1115        // Initial creation: no pool has a hash yet -> all get `desired`
1116        // (parallel) so a KRaft controller quorum can form.
1117        let pools = vec![
1118            st("a", None, false),
1119            st("b", None, false),
1120            st("c", None, false),
1121        ];
1122        let plan = plan_rollout(&pools, "H1");
1123        assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H1")]);
1124    }
1125
1126    #[test]
1127    fn single_pool_first_reconcile_gets_desired() {
1128        let pools = vec![st("only", None, false)];
1129        assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
1130    }
1131
1132    #[test]
1133    fn single_pool_roll_advances() {
1134        // Established single pool moving to a new hash.
1135        let pools = vec![st("only", Some("H0"), true)];
1136        assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
1137    }
1138
1139    #[test]
1140    fn steady_state_all_desired_is_noop() {
1141        let pools = vec![st("a", Some("H1"), true), st("b", Some("H1"), true)];
1142        assert!(targets(&plan_rollout(&pools, "H1")) == vec![("a", "H1"), ("b", "H1")]);
1143    }
1144
1145    #[test]
1146    fn established_roll_advances_first_pool_only() {
1147        // Uniform on H0; first reconcile after the change advances only
1148        // pool `a`, holding `b` and `c` at H0.
1149        let pools = vec![
1150            st("a", Some("H0"), true),
1151            st("b", Some("H0"), true),
1152            st("c", Some("H0"), true),
1153        ];
1154        let plan = plan_rollout(&pools, "H1");
1155        assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
1156    }
1157
1158    #[test]
1159    fn established_roll_holds_later_pools_until_first_ready() {
1160        // `a` already moved to H1 but is not Ready yet -> `b`, `c` wait.
1161        let pools = vec![
1162            st("a", Some("H1"), false),
1163            st("b", Some("H0"), true),
1164            st("c", Some("H0"), true),
1165        ];
1166        let plan = plan_rollout(&pools, "H1");
1167        assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
1168    }
1169
1170    #[test]
1171    fn established_roll_advances_next_after_prefix_converges() {
1172        // `a` converged (H1 + ready); advance `b`, hold `c`.
1173        let pools = vec![
1174            st("a", Some("H1"), true),
1175            st("b", Some("H0"), true),
1176            st("c", Some("H0"), true),
1177        ];
1178        let plan = plan_rollout(&pools, "H1");
1179        assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H0")]);
1180    }
1181
1182    #[test]
1183    fn messy_multiple_old_hashes_falls_back_to_all_desired() {
1184        // More than one distinct non-desired hash -> not a clean ordered
1185        // roll; apply `desired` to all (recovery).
1186        let pools = vec![st("a", Some("H0"), true), st("b", Some("HX"), true)];
1187        let plan = plan_rollout(&pools, "H1");
1188        assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1")]);
1189    }
1190}
1191
1192#[cfg(test)]
1193mod parse_quantity_tests {
1194    use super::parse_quantity;
1195    use assert2::assert;
1196
1197    #[test]
1198    fn quantity_parse_binary_suffixes() {
1199        assert!(parse_quantity("1Ki").unwrap() == 1024);
1200        assert!(parse_quantity("512Mi").unwrap() == 512 * 1024 * 1024);
1201        assert!(parse_quantity("10Gi").unwrap() == 10 * 1024 * 1024 * 1024);
1202    }
1203
1204    #[test]
1205    fn quantity_parse_decimal_suffixes() {
1206        assert!(parse_quantity("1K").unwrap() == 1_000);
1207        assert!(parse_quantity("500M").unwrap() == 500_000_000);
1208        assert!(parse_quantity("10G").unwrap() == 10_000_000_000);
1209    }
1210
1211    #[test]
1212    fn quantity_parse_decimal_mantissa() {
1213        // 1.5Gi = 1.5 * 1024^3 = 1,610,612,736
1214        assert!(parse_quantity("1.5Gi").unwrap() == 1_610_612_736);
1215    }
1216
1217    #[test]
1218    fn quantity_parse_no_suffix_is_bytes() {
1219        assert!(parse_quantity("1024").unwrap() == 1024);
1220    }
1221
1222    #[test]
1223    fn quantity_parse_rejects_garbage() {
1224        assert!(parse_quantity("").is_err());
1225        assert!(parse_quantity("banana").is_err());
1226        assert!(parse_quantity("1.5x").is_err());
1227        assert!(parse_quantity("Gi").is_err());
1228        // No scientific notation:
1229        assert!(parse_quantity("1e3").is_err());
1230    }
1231
1232    #[test]
1233    fn quantity_parse_zero_and_negative_are_errors() {
1234        assert!(parse_quantity("0").is_err());
1235        assert!(parse_quantity("0Gi").is_err());
1236        assert!(parse_quantity("-10Gi").is_err());
1237    }
1238}