Skip to main content

pgroles_operator/
crd.rs

1//! Custom Resource Definition for `PostgresPolicy`.
2//!
3//! Defines the `pgroles.io/v1alpha1` CRD that the operator watches.
4//! The spec mirrors the CLI manifest schema with additional fields for
5//! database connection and reconciliation scheduling.
6
7use kube::CustomResource;
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, BTreeSet};
11
12use pgroles_core::manifest::{
13    DefaultPrivilege, Grant, Membership, ObjectType, Privilege, RoleRetirement, SchemaBinding,
14};
15
16/// Valid PostgreSQL SSL modes for connection params.
17pub const VALID_SSL_MODES: &[&str] = &[
18    "disable",
19    "allow",
20    "prefer",
21    "require",
22    "verify-ca",
23    "verify-full",
24];
25
26// ---------------------------------------------------------------------------
27// CRD spec
28// ---------------------------------------------------------------------------
29
30/// Spec for a `PostgresPolicy` custom resource.
31///
32/// Defines the desired state of PostgreSQL roles, grants, default privileges,
33/// and memberships for a single database connection.
34#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, JsonSchema)]
35#[kube(
36    group = "pgroles.io",
37    version = "v1alpha1",
38    kind = "PostgresPolicy",
39    namespaced,
40    status = "PostgresPolicyStatus",
41    shortname = "pgr",
42    category = "pgroles",
43    printcolumn = r#"{"name":"Ready","type":"string","jsonPath":".status.conditions[?(@.type==\"Ready\")].status"}"#,
44    printcolumn = r#"{"name":"Mode","type":"string","jsonPath":".spec.mode"}"#,
45    printcolumn = r#"{"name":"Recon","type":"string","jsonPath":".spec.reconciliation_mode","priority":1}"#,
46    printcolumn = r#"{"name":"Drift","type":"string","jsonPath":".status.conditions[?(@.type==\"Drifted\")].status"}"#,
47    printcolumn = r#"{"name":"Changes","type":"integer","jsonPath":".status.change_summary.total"}"#,
48    printcolumn = r#"{"name":"Last Reconcile","type":"date","jsonPath":".status.last_successful_reconcile_time"}"#,
49    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
50)]
51pub struct PostgresPolicySpec {
52    /// Database connection configuration.
53    pub connection: ConnectionSpec,
54
55    /// Reconciliation interval (e.g. "5m", "1h"). Defaults to "5m".
56    #[serde(default = "default_interval")]
57    pub interval: String,
58
59    /// Suspend reconciliation when true. Defaults to false.
60    #[serde(default)]
61    pub suspend: bool,
62
63    /// Reconciliation mode: `apply` executes SQL, `plan` computes drift only.
64    #[serde(default)]
65    pub mode: PolicyMode,
66
67    /// Convergence strategy: how aggressively to converge the database.
68    ///
69    /// - `authoritative` (default): full convergence — anything not in the
70    ///   manifest is revoked/dropped.
71    /// - `additive`: only grant, never revoke — safe for incremental adoption.
72    /// - `adopt`: manage declared roles fully, but never drop undeclared roles.
73    #[serde(default)]
74    pub reconciliation_mode: CrdReconciliationMode,
75
76    /// Default owner for ALTER DEFAULT PRIVILEGES (e.g. "app_owner").
77    #[serde(default)]
78    pub default_owner: Option<String>,
79
80    /// Reusable privilege profiles.
81    #[serde(default)]
82    pub profiles: std::collections::HashMap<String, ProfileSpec>,
83
84    /// Schema bindings that expand profiles into concrete roles/grants.
85    #[serde(default)]
86    pub schemas: Vec<SchemaBinding>,
87
88    /// One-off role definitions.
89    #[serde(default)]
90    pub roles: Vec<RoleSpec>,
91
92    /// One-off grants.
93    #[serde(default)]
94    pub grants: Vec<Grant>,
95
96    /// One-off default privileges.
97    #[serde(default)]
98    pub default_privileges: Vec<DefaultPrivilege>,
99
100    /// Membership edges.
101    #[serde(default)]
102    pub memberships: Vec<Membership>,
103
104    /// Explicit role-retirement workflows for roles that should be removed.
105    #[serde(default)]
106    pub retirements: Vec<RoleRetirement>,
107
108    /// Approval mode for plans: `auto` or `manual`.
109    /// When `manual`, plans require explicit approval before execution.
110    /// When `auto`, plans are approved and applied immediately.
111    /// When omitted, inferred from `mode`: `apply` → `auto`, `plan` → `manual`.
112    /// This ensures backward compatibility for existing `mode: apply` users.
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub approval: Option<ApprovalMode>,
115}
116
117fn default_interval() -> String {
118    "5m".to_string()
119}
120
121/// Policy reconcile mode.
122#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
123#[serde(rename_all = "lowercase")]
124pub enum PolicyMode {
125    #[default]
126    Apply,
127    Plan,
128}
129
130/// Convergence strategy for how aggressively to converge the database.
131#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
132#[serde(rename_all = "lowercase")]
133pub enum CrdReconciliationMode {
134    /// Full convergence — the manifest is the entire truth.
135    #[default]
136    Authoritative,
137    /// Only grant, never revoke — safe for incremental adoption.
138    Additive,
139    /// Manage declared roles fully, but never drop undeclared roles.
140    Adopt,
141}
142
143/// Approval mode for plans generated by this policy.
144#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
145pub enum ApprovalMode {
146    /// Plans require explicit approval annotation before execution.
147    #[serde(rename = "manual")]
148    Manual,
149    /// Plans are approved and applied automatically.
150    #[serde(rename = "auto")]
151    Auto,
152}
153
154impl PostgresPolicySpec {
155    /// Resolve the effective approval mode, inferring from `mode` when not set.
156    /// `apply` → `Auto` (backward compat), `plan` → `Manual`.
157    pub fn effective_approval(&self) -> ApprovalMode {
158        match &self.approval {
159            Some(mode) => mode.clone(),
160            None => match self.mode {
161                PolicyMode::Apply => ApprovalMode::Auto,
162                PolicyMode::Plan => ApprovalMode::Manual,
163            },
164        }
165    }
166}
167
168// ---------------------------------------------------------------------------
169// Well-known annotations and labels
170// ---------------------------------------------------------------------------
171
172/// Annotation key used to approve a `PostgresPolicyPlan`.
173pub const PLAN_APPROVED_ANNOTATION: &str = "pgroles.io/approved";
174
175/// Annotation key used to reject a `PostgresPolicyPlan`.
176pub const PLAN_REJECTED_ANNOTATION: &str = "pgroles.io/rejected";
177
178/// Label key for the parent policy name on plan resources.
179pub const LABEL_POLICY: &str = "pgroles.io/policy";
180
181/// Label key for the managed database identity on plan resources.
182pub const LABEL_DATABASE_IDENTITY: &str = "pgroles.io/database-identity";
183
184/// Label key for the plan name on SQL storage resources.
185pub const LABEL_PLAN: &str = "pgroles.io/plan";
186
187impl From<CrdReconciliationMode> for pgroles_core::diff::ReconciliationMode {
188    fn from(crd: CrdReconciliationMode) -> Self {
189        match crd {
190            CrdReconciliationMode::Authoritative => {
191                pgroles_core::diff::ReconciliationMode::Authoritative
192            }
193            CrdReconciliationMode::Additive => pgroles_core::diff::ReconciliationMode::Additive,
194            CrdReconciliationMode::Adopt => pgroles_core::diff::ReconciliationMode::Adopt,
195        }
196    }
197}
198
199/// Database connection configuration.
200///
201/// Supports two mutually exclusive modes:
202///
203/// **Mode 1 — Single URL** (backward-compatible):
204/// ```yaml
205/// connection:
206///   secretRef: { name: my-secret }
207///   secretKey: DATABASE_URL        # optional, defaults to DATABASE_URL
208/// ```
209///
210/// **Mode 2 — Structured params** (for Zalando/CNPG/PGO secrets):
211/// ```yaml
212/// connection:
213///   params:
214///     host: my-cluster-postgres
215///     port: 5432
216///     dbname: mydb
217///     usernameSecret: { name: zalando-creds, key: username }
218///     passwordSecret: { name: zalando-creds, key: password }
219/// ```
220///
221/// Params mode can also use provider-backed authentication instead of a static
222/// password, for example GKE Workload Identity to Cloud SQL IAM.
223#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
224#[serde(rename_all = "camelCase")]
225pub struct ConnectionSpec {
226    /// Reference to a Kubernetes Secret containing a connection URL.
227    /// Mutually exclusive with `params`.
228    #[serde(default)]
229    pub secret_ref: Option<SecretReference>,
230
231    /// Key within the Secret to read. Defaults to `DATABASE_URL`.
232    /// Only used with `secretRef`.
233    #[serde(default)]
234    pub secret_key: Option<String>,
235
236    /// Structured connection parameters. Each field is either a plain string
237    /// or a reference to a Secret key. Mutually exclusive with `secretRef`.
238    #[serde(default)]
239    pub params: Option<ConnectionParams>,
240}
241
242impl ConnectionSpec {
243    /// Effective secret key for URL mode. Defaults to `DATABASE_URL`.
244    pub fn effective_secret_key(&self) -> &str {
245        self.secret_key.as_deref().unwrap_or("DATABASE_URL")
246    }
247
248    /// Collect all Secret names referenced by this connection spec.
249    pub fn collect_secret_names(&self, names: &mut BTreeSet<String>) {
250        if let Some(ref secret_ref) = self.secret_ref {
251            names.insert(secret_ref.name.clone());
252        }
253        if let Some(ref params) = self.params {
254            for sel in [
255                &params.host_secret,
256                &params.port_secret,
257                &params.dbname_secret,
258                &params.username_secret,
259                &params.password_secret,
260                &params.ssl_mode_secret,
261            ]
262            .into_iter()
263            .flatten()
264            {
265                names.insert(sel.name.clone());
266            }
267        }
268    }
269
270    /// Deterministic identity key for this connection spec.
271    ///
272    /// - URL mode: `{secret_ref.name}/{secret_key}`
273    /// - Params mode: canonical representation of the params
274    ///
275    /// Uses `\0` as field separator since null bytes cannot appear in K8s names
276    /// or secret values, avoiding ambiguity from colons in literal values.
277    /// Deterministic identity key for per-database locking and conflict detection.
278    ///
279    /// Identifies the target database (host + port + dbname) but NOT the
280    /// credentials. Two policies targeting the same database with different
281    /// users should still be considered as targeting the same database for
282    /// locking and overlap checks.
283    pub fn identity_key(&self) -> String {
284        if let Some(ref secret_ref) = self.secret_ref {
285            format!("{}/{}", secret_ref.name, self.effective_secret_key())
286        } else if let Some(ref params) = self.params {
287            let port_part = params
288                .port
289                .as_ref()
290                .map(|p| format!("literal={p}"))
291                .or_else(|| {
292                    params
293                        .port_secret
294                        .as_ref()
295                        .map(|s| format!("secret={}\0{}", s.name, s.key))
296                })
297                .unwrap_or_else(|| "5432".to_string());
298            format!(
299                "params\0{}\0{}\0{}",
300                field_identity_repr(&params.host, &params.host_secret),
301                field_identity_repr(&params.dbname, &params.dbname_secret),
302                port_part,
303            )
304        } else {
305            "invalid-connection".to_string()
306        }
307    }
308
309    /// Cache key for pool lookup. Includes ALL connection params so that any
310    /// configuration change (credentials, sslMode, host, etc.) invalidates
311    /// the cached pool. This is strictly more specific than `identity_key`.
312    pub fn cache_key(&self, namespace: &str) -> String {
313        if let Some(ref params) = self.params {
314            let user_part = field_identity_repr(&params.username, &params.username_secret);
315            let pass_part = field_identity_repr(&params.password, &params.password_secret);
316            let auth_part = params
317                .auth
318                .as_ref()
319                .map(ConnectionAuth::cache_key)
320                .unwrap_or_default();
321            let ssl_part = params
322                .ssl_mode
323                .as_ref()
324                .map(|v| format!("literal={v}"))
325                .or_else(|| {
326                    params
327                        .ssl_mode_secret
328                        .as_ref()
329                        .map(|s| format!("secret={}\0{}", s.name, s.key))
330                })
331                .unwrap_or_default();
332            format!(
333                "{namespace}/{}\0user={user_part}\0pass={pass_part}\0auth={auth_part}\0ssl={ssl_part}",
334                self.identity_key()
335            )
336        } else {
337            format!("{namespace}/{}", self.identity_key())
338        }
339    }
340}
341
342/// Deterministic string representation for a literal/secret field pair.
343///
344/// Uses a `literal=` / `secret=` prefix scheme so that a literal value
345/// can never collide with a secret reference representation.
346fn field_identity_repr(literal: &Option<String>, secret: &Option<SecretKeySelector>) -> String {
347    if let Some(value) = literal {
348        format!("literal={value}")
349    } else if let Some(sel) = secret {
350        format!("secret={}\0{}", sel.name, sel.key)
351    } else {
352        String::new()
353    }
354}
355
356/// Default OAuth scope used for Cloud SQL IAM database login tokens.
357pub const DEFAULT_GCP_CLOUD_SQL_LOGIN_SCOPE: &str =
358    "https://www.googleapis.com/auth/sqlservice.login";
359
360/// Structured connection parameters for building a PostgreSQL connection URL.
361///
362/// Each field supports either a literal value or a reference to a Kubernetes
363/// Secret key. For each parameter, set either the literal field or the
364/// corresponding `*Secret` field — not both.
365///
366/// ```yaml
367/// # Zalando pattern — literals for non-sensitive, secrets for credentials
368/// params:
369///   host: my-cluster-postgres
370///   port: 5432
371///   dbname: mydb
372///   sslMode: require
373///   usernameSecret:
374///     name: pg-creds
375///     key: username
376///   passwordSecret:
377///     name: pg-creds
378///     key: password
379/// ```
380#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
381#[serde(rename_all = "camelCase")]
382pub struct ConnectionParams {
383    /// PostgreSQL host as a literal value.
384    #[serde(default)]
385    pub host: Option<String>,
386    /// PostgreSQL host from a Secret key.
387    #[serde(default)]
388    pub host_secret: Option<SecretKeySelector>,
389
390    /// Port as a literal value. Defaults to 5432 if neither port nor portSecret is set.
391    #[serde(default)]
392    pub port: Option<u16>,
393    /// Port from a Secret key.
394    #[serde(default)]
395    pub port_secret: Option<SecretKeySelector>,
396
397    /// Database name as a literal value.
398    #[serde(default)]
399    pub dbname: Option<String>,
400    /// Database name from a Secret key.
401    #[serde(default)]
402    pub dbname_secret: Option<SecretKeySelector>,
403
404    /// Username as a literal value.
405    #[serde(default)]
406    pub username: Option<String>,
407    /// Username from a Secret key.
408    #[serde(default)]
409    pub username_secret: Option<SecretKeySelector>,
410
411    /// Password as a literal value (not recommended for production).
412    #[serde(default)]
413    pub password: Option<String>,
414    /// Password from a Secret key (recommended).
415    #[serde(default)]
416    pub password_secret: Option<SecretKeySelector>,
417
418    /// Provider-backed authentication for connections that use short-lived
419    /// credentials instead of a static PostgreSQL password.
420    #[serde(default)]
421    pub auth: Option<ConnectionAuth>,
422
423    /// SSL mode as a literal value.
424    #[serde(default)]
425    pub ssl_mode: Option<String>,
426    /// SSL mode from a Secret key.
427    #[serde(default)]
428    pub ssl_mode_secret: Option<SecretKeySelector>,
429}
430
431/// Provider-backed authentication for `connection.params`.
432#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
433#[serde(tag = "type")]
434pub enum ConnectionAuth {
435    /// Fetch a Cloud SQL IAM database login token using the GKE metadata
436    /// server. `username` must be the Cloud SQL PostgreSQL IAM role name
437    /// (for service accounts, the email without `.gserviceaccount.com`).
438    #[serde(rename = "gcp_workload_identity", rename_all = "camelCase")]
439    GcpWorkloadIdentity {
440        /// Target Google service account to impersonate before requesting the
441        /// Cloud SQL login token. Omit to use the pod's bound identity.
442        #[serde(default)]
443        impersonate_service_account: Option<String>,
444        /// OAuth scope requested for the access token.
445        #[serde(default)]
446        scope: Option<String>,
447    },
448}
449
450impl ConnectionAuth {
451    pub fn gcp_scope(&self) -> &str {
452        match self {
453            Self::GcpWorkloadIdentity { scope, .. } => scope
454                .as_deref()
455                .unwrap_or(DEFAULT_GCP_CLOUD_SQL_LOGIN_SCOPE),
456        }
457    }
458
459    pub fn gcp_impersonate_service_account(&self) -> Option<&str> {
460        match self {
461            Self::GcpWorkloadIdentity {
462                impersonate_service_account,
463                ..
464            } => impersonate_service_account.as_deref(),
465        }
466    }
467
468    fn cache_key(&self) -> String {
469        match self {
470            Self::GcpWorkloadIdentity {
471                impersonate_service_account,
472                scope,
473            } => format!(
474                "gcp_workload_identity\0impersonate={}\0scope={}",
475                impersonate_service_account.as_deref().unwrap_or_default(),
476                scope
477                    .as_deref()
478                    .unwrap_or(DEFAULT_GCP_CLOUD_SQL_LOGIN_SCOPE)
479            ),
480        }
481    }
482}
483
484/// Reference to a specific key within a Kubernetes Secret.
485#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
486pub struct SecretKeySelector {
487    /// Name of the Secret.
488    pub name: String,
489    /// Key within the Secret.
490    pub key: String,
491}
492
493/// Reference to a Kubernetes Secret in the same namespace.
494#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
495pub struct SecretReference {
496    /// Name of the Secret.
497    pub name: String,
498}
499
500/// A reusable privilege profile (CRD-compatible version).
501///
502/// This mirrors `pgroles_core::manifest::Profile` but derives `JsonSchema`.
503#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
504pub struct ProfileSpec {
505    #[serde(default)]
506    pub login: Option<bool>,
507
508    #[serde(default)]
509    pub inherit: Option<bool>,
510
511    #[serde(default)]
512    pub grants: Vec<ProfileGrantSpec>,
513
514    #[serde(default)]
515    pub default_privileges: Vec<DefaultPrivilegeGrantSpec>,
516}
517
518/// Grant template within a profile.
519#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
520pub struct ProfileGrantSpec {
521    pub privileges: Vec<Privilege>,
522    #[serde(alias = "on")]
523    pub object: ProfileObjectTargetSpec,
524}
525
526/// Object target within a profile.
527#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
528pub struct ProfileObjectTargetSpec {
529    #[serde(rename = "type")]
530    pub object_type: ObjectType,
531    #[serde(default)]
532    pub name: Option<String>,
533}
534
535/// Default privilege grant within a profile.
536#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
537pub struct DefaultPrivilegeGrantSpec {
538    #[serde(default)]
539    pub role: Option<String>,
540    pub privileges: Vec<Privilege>,
541    pub on_type: ObjectType,
542}
543
544/// A concrete role definition (CRD-compatible version).
545#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
546pub struct RoleSpec {
547    pub name: String,
548    #[serde(default)]
549    pub login: Option<bool>,
550    #[serde(default)]
551    pub superuser: Option<bool>,
552    #[serde(default)]
553    pub createdb: Option<bool>,
554    #[serde(default)]
555    pub createrole: Option<bool>,
556    #[serde(default)]
557    pub inherit: Option<bool>,
558    #[serde(default)]
559    pub replication: Option<bool>,
560    #[serde(default)]
561    pub bypassrls: Option<bool>,
562    #[serde(default)]
563    pub connection_limit: Option<i32>,
564    #[serde(default)]
565    pub comment: Option<String>,
566    /// Password source for this role. Either a reference to an existing Secret
567    /// or a request for the operator to generate one.
568    #[serde(default)]
569    pub password: Option<PasswordSpec>,
570    /// Password expiration timestamp (ISO 8601, e.g. "2025-12-31T00:00:00Z").
571    #[serde(default)]
572    pub password_valid_until: Option<String>,
573}
574
575/// Password configuration: either reference an existing Secret or have the
576/// operator generate a password and create a Secret.
577///
578/// Exactly one of `secretRef` or `generate` must be set.
579///
580/// ```yaml
581/// # Read from existing Secret:
582/// password:
583///   secretRef: { name: role-passwords }
584///   secretKey: password-user
585///
586/// # Operator generates and manages a Secret:
587/// password:
588///   generate:
589///     length: 48
590/// ```
591#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
592#[serde(rename_all = "camelCase")]
593pub struct PasswordSpec {
594    /// Reference to an existing Kubernetes Secret containing the password.
595    /// Mutually exclusive with `generate`.
596    #[serde(default)]
597    pub secret_ref: Option<SecretReference>,
598    /// Key within the referenced Secret. Defaults to the role name.
599    /// Only used with `secretRef`.
600    #[serde(default)]
601    pub secret_key: Option<String>,
602    /// Generate a random password and store it in a new Kubernetes Secret.
603    /// Mutually exclusive with `secretRef`.
604    #[serde(default)]
605    pub generate: Option<GeneratePasswordSpec>,
606}
607
608impl PasswordSpec {
609    /// Returns true if this is a reference to an existing Secret.
610    pub fn is_secret_ref(&self) -> bool {
611        self.secret_ref.is_some()
612    }
613
614    /// Returns true if this is a request to generate a password.
615    pub fn is_generate(&self) -> bool {
616        self.generate.is_some()
617    }
618}
619
620/// Configuration for operator-generated passwords.
621#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
622#[serde(rename_all = "camelCase")]
623pub struct GeneratePasswordSpec {
624    /// Password length. Defaults to 32. Minimum 16, maximum 128.
625    #[serde(default)]
626    pub length: Option<u32>,
627    /// Override the generated Secret name. Defaults to `{policy}-pgr-{role}`.
628    #[serde(default)]
629    pub secret_name: Option<String>,
630    /// Key within the generated Secret. Defaults to `password`.
631    #[serde(default)]
632    pub secret_key: Option<String>,
633}
634
635#[derive(Debug, Clone, thiserror::Error)]
636pub enum PasswordValidationError {
637    #[error("role \"{role}\" has a password but login is not enabled")]
638    PasswordWithoutLogin { role: String },
639
640    #[error("role \"{role}\" password must set exactly one of secretRef or generate")]
641    InvalidPasswordMode { role: String },
642
643    #[error("role \"{role}\" password.generate.length must be between {min} and {max}")]
644    InvalidGeneratedLength { role: String, min: u32, max: u32 },
645
646    #[error(
647        "role \"{role}\" password.generate.secretName \"{name}\" is not a valid Kubernetes Secret name"
648    )]
649    InvalidGeneratedSecretName { role: String, name: String },
650
651    #[error("role \"{role}\" password {field} \"{key}\" is not a valid Kubernetes Secret data key")]
652    InvalidSecretKey {
653        role: String,
654        field: &'static str,
655        key: String,
656    },
657
658    #[error(
659        "role \"{role}\" password.generate.secretKey \"{key}\" is reserved for the SCRAM verifier"
660    )]
661    ReservedGeneratedSecretKey { role: String, key: String },
662}
663
664/// Errors from connection spec validation.
665#[derive(Debug, Clone, thiserror::Error)]
666pub enum ConnectionValidationError {
667    #[error("connection: exactly one of secretRef or params must be set, but both were provided")]
668    BothModesSet,
669
670    #[error("connection: exactly one of secretRef or params must be set, but neither was provided")]
671    NeitherModeSet,
672
673    #[error("connection.params.{field}: secret {detail}")]
674    EmptySecretKeyRef { field: String, detail: String },
675
676    #[error(
677        "connection.params.sslMode: \"{value}\" is not valid (expected one of: disable, allow, prefer, require, verify-ca, verify-full)"
678    )]
679    InvalidSslMode { value: String },
680
681    #[error("connection.params.{field}: literal value must not be empty or whitespace-only")]
682    EmptyLiteral { field: String },
683
684    #[error("connection.params: exactly one of {field} or {field}Secret must be set")]
685    NeitherFieldSet { field: String },
686
687    #[error(
688        "connection.params: only one of {field} or {field}Secret may be set, but both were provided"
689    )]
690    BothFieldsSet { field: String },
691
692    #[error("connection.params.auth: {field} must not be empty or whitespace-only")]
693    EmptyAuthField { field: String },
694
695    #[error("connection.params: password/passwordSecret are mutually exclusive with auth")]
696    AuthWithPassword,
697}
698
699/// Validate a Kubernetes Secret name per RFC 1123 DNS subdomain rules:
700/// lowercase alpha start, alphanumeric end, body allows lowercase alpha,
701/// digits, `-`, and `.`.
702fn is_valid_secret_name(name: &str) -> bool {
703    if name.is_empty() || name.len() > crate::password::MAX_SECRET_NAME_LENGTH {
704        return false;
705    }
706    let bytes = name.as_bytes();
707    // RFC 1123: must start with a lowercase letter.
708    if !bytes[0].is_ascii_lowercase() {
709        return false;
710    }
711    if !bytes[bytes.len() - 1].is_ascii_lowercase() && !bytes[bytes.len() - 1].is_ascii_digit() {
712        return false;
713    }
714    bytes
715        .iter()
716        .all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || *b == b'-' || *b == b'.')
717}
718
719fn is_valid_secret_key(key: &str) -> bool {
720    !key.is_empty()
721        && key
722            .bytes()
723            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.'))
724}
725
726// ---------------------------------------------------------------------------
727// CRD status
728// ---------------------------------------------------------------------------
729
730/// Status of a `PostgresPolicy` resource.
731#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
732pub struct PostgresPolicyStatus {
733    /// Standard Kubernetes conditions.
734    #[serde(default)]
735    pub conditions: Vec<PolicyCondition>,
736
737    /// The `.metadata.generation` that was last successfully reconciled.
738    #[serde(default)]
739    pub observed_generation: Option<i64>,
740
741    /// The `.metadata.generation` that was last attempted.
742    #[serde(default)]
743    pub last_attempted_generation: Option<i64>,
744
745    /// ISO 8601 timestamp of the last successful reconciliation.
746    #[serde(default)]
747    pub last_successful_reconcile_time: Option<String>,
748
749    /// Deprecated alias retained for compatibility with older status readers.
750    #[serde(default)]
751    pub last_reconcile_time: Option<String>,
752
753    /// Summary of changes applied in the last reconciliation.
754    #[serde(default)]
755    pub change_summary: Option<ChangeSummary>,
756
757    /// The reconciliation mode used for the last successful reconcile.
758    #[serde(default)]
759    pub last_reconcile_mode: Option<PolicyMode>,
760
761    /// Planned SQL for the last successful plan-mode reconcile.
762    #[serde(default)]
763    pub planned_sql: Option<String>,
764
765    /// Whether `planned_sql` was truncated to fit safely in status.
766    #[serde(default)]
767    pub planned_sql_truncated: bool,
768
769    /// Canonical identity of the managed database target.
770    #[serde(default)]
771    pub managed_database_identity: Option<String>,
772
773    /// Roles claimed by this policy's declared ownership scope.
774    #[serde(default)]
775    pub owned_roles: Vec<String>,
776
777    /// Schemas claimed by this policy's declared ownership scope.
778    #[serde(default)]
779    pub owned_schemas: Vec<String>,
780
781    /// Last reconcile error message, if any.
782    #[serde(default)]
783    pub last_error: Option<String>,
784
785    /// Last applied password source version for each password-managed role.
786    #[serde(default)]
787    pub applied_password_source_versions: BTreeMap<String, String>,
788
789    /// Consecutive transient operational failures used for exponential backoff.
790    #[serde(default)]
791    pub transient_failure_count: i32,
792
793    /// Reference to the current/latest plan for this policy.
794    #[serde(default)]
795    pub current_plan_ref: Option<PlanReference>,
796}
797
798/// A condition on the `PostgresPolicy` resource.
799#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
800pub struct PolicyCondition {
801    /// Type of condition: "Ready", "Reconciling", "Degraded".
802    #[serde(rename = "type")]
803    pub condition_type: String,
804
805    /// Status: "True", "False", or "Unknown".
806    pub status: String,
807
808    /// Human-readable reason for the condition.
809    #[serde(default)]
810    pub reason: Option<String>,
811
812    /// Human-readable message.
813    #[serde(default)]
814    pub message: Option<String>,
815
816    /// Last time the condition transitioned.
817    #[serde(default)]
818    pub last_transition_time: Option<String>,
819}
820
821/// Reference to a `PostgresPolicyPlan` resource.
822#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
823pub struct PlanReference {
824    pub name: String,
825}
826
827/// Summary of changes applied during reconciliation.
828#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
829#[serde(default)]
830pub struct ChangeSummary {
831    #[serde(default)]
832    pub roles_created: i32,
833    #[serde(default)]
834    pub roles_altered: i32,
835    #[serde(default)]
836    pub schemas_created: i32,
837    #[serde(default)]
838    pub schema_owners_altered: i32,
839    #[serde(default)]
840    pub roles_dropped: i32,
841    #[serde(default)]
842    pub sessions_terminated: i32,
843    #[serde(default)]
844    pub grants_added: i32,
845    #[serde(default)]
846    pub grants_revoked: i32,
847    #[serde(default)]
848    pub default_privileges_set: i32,
849    #[serde(default)]
850    pub default_privileges_revoked: i32,
851    #[serde(default)]
852    pub members_added: i32,
853    #[serde(default)]
854    pub members_removed: i32,
855    #[serde(default)]
856    pub passwords_set: i32,
857    #[serde(default)]
858    pub total: i32,
859}
860
861// ---------------------------------------------------------------------------
862// PostgresPolicyPlan CRD
863// ---------------------------------------------------------------------------
864
865/// Spec for a `PostgresPolicyPlan` custom resource.
866///
867/// Represents a computed reconciliation plan for a `PostgresPolicy`. Plans are
868/// created by the operator and may require explicit approval before execution.
869#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, JsonSchema)]
870#[kube(
871    group = "pgroles.io",
872    version = "v1alpha1",
873    kind = "PostgresPolicyPlan",
874    namespaced,
875    status = "PostgresPolicyPlanStatus",
876    shortname = "pgplan",
877    category = "pgroles",
878    printcolumn = r#"{"name":"Policy","type":"string","jsonPath":".spec.policyRef.name"}"#,
879    printcolumn = r#"{"name":"Mode","type":"string","jsonPath":".spec.reconciliationMode"}"#,
880    printcolumn = r#"{"name":"Approved","type":"string","jsonPath":".status.conditions[?(@.type==\"Approved\")].status"}"#,
881    printcolumn = r#"{"name":"Changes","type":"integer","jsonPath":".status.changeSummary.total"}"#,
882    printcolumn = r#"{"name":"SQL Stmts","type":"integer","jsonPath":".status.sqlStatements","priority":1}"#,
883    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
884    printcolumn = r#"{"name":"SQL","type":"string","jsonPath":".status.sqlRef.name","priority":1}"#,
885    printcolumn = r#"{"name":"Hash","type":"string","jsonPath":".status.sqlHash","priority":1}"#,
886    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
887)]
888#[serde(rename_all = "camelCase")]
889pub struct PostgresPolicyPlanSpec {
890    /// Reference to the policy that generated this plan.
891    pub policy_ref: PolicyPlanRef,
892    /// The policy's `.metadata.generation` at plan time.
893    pub policy_generation: i64,
894    /// Reconciliation mode used for this plan.
895    pub reconciliation_mode: CrdReconciliationMode,
896    /// Roles that this plan covers.
897    #[serde(default)]
898    pub owned_roles: Vec<String>,
899    /// Schemas that this plan covers.
900    #[serde(default)]
901    pub owned_schemas: Vec<String>,
902    /// Database identity string for disambiguation in multi-db setups.
903    pub managed_database_identity: String,
904}
905
906/// Reference to the parent `PostgresPolicy` that generated a plan.
907#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
908pub struct PolicyPlanRef {
909    pub name: String,
910}
911
912/// Status of a `PostgresPolicyPlan` resource.
913#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
914#[serde(rename_all = "camelCase")]
915pub struct PostgresPolicyPlanStatus {
916    /// Phase: Pending, Approved, Applying, Applied, Failed, Superseded.
917    #[serde(default)]
918    pub phase: PlanPhase,
919    /// Standard conditions: Computed, Approved, Applied.
920    #[serde(default)]
921    pub conditions: Vec<PolicyCondition>,
922    /// Summary of changes in this plan.
923    #[serde(default)]
924    pub change_summary: Option<ChangeSummary>,
925    /// Reference to ConfigMap containing the full SQL (for large plans).
926    #[serde(default)]
927    pub sql_ref: Option<SqlRef>,
928    /// Inline SQL for small plans (below a size threshold).
929    #[serde(default)]
930    pub sql_inline: Option<String>,
931    /// True when the SQL preview was truncated because the full redacted SQL
932    /// could not be persisted within Kubernetes object limits.
933    #[serde(default)]
934    pub sql_truncated: bool,
935    /// Timestamp when the plan was computed.
936    #[serde(default)]
937    pub computed_at: Option<String>,
938    /// Timestamp when the plan was applied (if applicable).
939    #[serde(default)]
940    pub applied_at: Option<String>,
941    /// Error message if apply failed.
942    #[serde(default)]
943    pub last_error: Option<String>,
944    /// SHA-256 hash of the planned SQL, used to detect duplicate plans.
945    /// If a newly computed plan has the same hash as the current pending plan,
946    /// the operator can skip creating a redundant plan.
947    #[serde(default)]
948    pub sql_hash: Option<String>,
949    /// Timestamp when the plan entered Applying phase (for stuck detection).
950    #[serde(default)]
951    pub applying_since: Option<String>,
952    /// Timestamp when the plan entered Failed phase (for dedup window).
953    #[serde(default)]
954    pub failed_at: Option<String>,
955    /// Number of SQL statements in the plan (after wildcard expansion).
956    /// May be significantly larger than `changeSummary.total` when wildcard
957    /// grants expand to many per-object statements.
958    #[serde(default)]
959    pub sql_statements: Option<i64>,
960    /// SHA-256 hash of the redacted SQL preview bytes. This is for storage
961    /// integrity only; approval and deduplication continue to use `sql_hash`.
962    #[serde(default)]
963    pub redacted_sql_hash: Option<String>,
964    /// Uncompressed byte length of the redacted SQL preview.
965    #[serde(default)]
966    pub sql_original_bytes: Option<i64>,
967    /// Stored byte length of the SQL preview after inline/truncation/compression.
968    #[serde(default)]
969    pub sql_stored_bytes: Option<i64>,
970}
971
972/// Reference to a ConfigMap containing SQL for a plan.
973#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
974pub struct SqlRef {
975    pub name: String,
976    pub key: String,
977    /// Compression used for the referenced SQL content. Missing means older
978    /// uncompressed ConfigMap data.
979    #[serde(default, skip_serializing_if = "Option::is_none")]
980    pub compression: Option<SqlCompression>,
981}
982
983/// Compression format used for persisted plan SQL previews.
984#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
985#[serde(rename_all = "lowercase")]
986pub enum SqlCompression {
987    Gzip,
988}
989
990/// Phase of a `PostgresPolicyPlan`.
991#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
992pub enum PlanPhase {
993    #[default]
994    Pending,
995    Approved,
996    Applying,
997    Applied,
998    Failed,
999    Superseded,
1000    Rejected,
1001}
1002
1003impl std::fmt::Display for PlanPhase {
1004    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1005        match self {
1006            PlanPhase::Pending => write!(f, "Pending"),
1007            PlanPhase::Approved => write!(f, "Approved"),
1008            PlanPhase::Applying => write!(f, "Applying"),
1009            PlanPhase::Applied => write!(f, "Applied"),
1010            PlanPhase::Failed => write!(f, "Failed"),
1011            PlanPhase::Superseded => write!(f, "Superseded"),
1012            PlanPhase::Rejected => write!(f, "Rejected"),
1013        }
1014    }
1015}
1016
1017// ---------------------------------------------------------------------------
1018// Conflict detection
1019// ---------------------------------------------------------------------------
1020
1021/// Canonical target identity for conflict detection between policies.
1022#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1023pub struct DatabaseIdentity(String);
1024
1025impl DatabaseIdentity {
1026    /// Create a database identity from the namespace and connection spec's identity key.
1027    pub fn from_connection(namespace: &str, connection: &ConnectionSpec) -> Self {
1028        Self(format!("{namespace}/{}", connection.identity_key()))
1029    }
1030
1031    pub fn as_str(&self) -> &str {
1032        &self.0
1033    }
1034}
1035
1036/// Conservative ownership claims for a policy.
1037#[derive(Debug, Clone, Default, PartialEq, Eq)]
1038pub struct OwnershipClaims {
1039    pub roles: BTreeSet<String>,
1040    pub schemas: BTreeSet<String>,
1041}
1042
1043impl OwnershipClaims {
1044    pub fn overlaps(&self, other: &Self) -> bool {
1045        !self.roles.is_disjoint(&other.roles) || !self.schemas.is_disjoint(&other.schemas)
1046    }
1047
1048    pub fn overlap_summary(&self, other: &Self) -> String {
1049        let overlapping_roles: Vec<_> = self.roles.intersection(&other.roles).cloned().collect();
1050        let overlapping_schemas: Vec<_> =
1051            self.schemas.intersection(&other.schemas).cloned().collect();
1052
1053        let mut parts = Vec::new();
1054        if !overlapping_roles.is_empty() {
1055            parts.push(format!("roles: {}", overlapping_roles.join(", ")));
1056        }
1057        if !overlapping_schemas.is_empty() {
1058            parts.push(format!("schemas: {}", overlapping_schemas.join(", ")));
1059        }
1060
1061        parts.join("; ")
1062    }
1063}
1064
1065// ---------------------------------------------------------------------------
1066// Secret name helpers
1067// ---------------------------------------------------------------------------
1068
1069impl PostgresPolicySpec {
1070    pub fn validate_password_specs(
1071        &self,
1072        policy_name: &str,
1073    ) -> Result<(), PasswordValidationError> {
1074        for role in &self.roles {
1075            let Some(password) = &role.password else {
1076                continue;
1077            };
1078
1079            if role.login != Some(true) {
1080                return Err(PasswordValidationError::PasswordWithoutLogin {
1081                    role: role.name.clone(),
1082                });
1083            }
1084
1085            match (&password.secret_ref, &password.generate) {
1086                (Some(_), None) => {
1087                    let secret_key = password.secret_key.as_deref().unwrap_or(&role.name);
1088                    if !is_valid_secret_key(secret_key) {
1089                        return Err(PasswordValidationError::InvalidSecretKey {
1090                            role: role.name.clone(),
1091                            field: "secretKey",
1092                            key: secret_key.to_string(),
1093                        });
1094                    }
1095                }
1096                (None, Some(generate)) => {
1097                    if let Some(length) = generate.length
1098                        && !(crate::password::MIN_PASSWORD_LENGTH
1099                            ..=crate::password::MAX_PASSWORD_LENGTH)
1100                            .contains(&length)
1101                    {
1102                        return Err(PasswordValidationError::InvalidGeneratedLength {
1103                            role: role.name.clone(),
1104                            min: crate::password::MIN_PASSWORD_LENGTH,
1105                            max: crate::password::MAX_PASSWORD_LENGTH,
1106                        });
1107                    }
1108
1109                    let secret_name =
1110                        crate::password::generated_secret_name(policy_name, &role.name, generate);
1111                    if !is_valid_secret_name(&secret_name) {
1112                        return Err(PasswordValidationError::InvalidGeneratedSecretName {
1113                            role: role.name.clone(),
1114                            name: secret_name,
1115                        });
1116                    }
1117
1118                    let secret_key = crate::password::generated_secret_key(generate);
1119                    if !is_valid_secret_key(&secret_key) {
1120                        return Err(PasswordValidationError::InvalidSecretKey {
1121                            role: role.name.clone(),
1122                            field: "generate.secretKey",
1123                            key: secret_key,
1124                        });
1125                    }
1126                    if secret_key == crate::password::GENERATED_VERIFIER_KEY {
1127                        return Err(PasswordValidationError::ReservedGeneratedSecretKey {
1128                            role: role.name.clone(),
1129                            key: secret_key,
1130                        });
1131                    }
1132                }
1133                _ => {
1134                    return Err(PasswordValidationError::InvalidPasswordMode {
1135                        role: role.name.clone(),
1136                    });
1137                }
1138            }
1139        }
1140
1141        Ok(())
1142    }
1143
1144    /// Validate the connection spec.
1145    ///
1146    /// Ensures exactly one of `secretRef` or `params` is set, and that params
1147    /// mode has all required fields with valid values.
1148    pub fn validate_connection_spec(&self) -> Result<(), ConnectionValidationError> {
1149        let conn = &self.connection;
1150        match (&conn.secret_ref, &conn.params) {
1151            (Some(_), None) => {
1152                // URL mode — valid.
1153                Ok(())
1154            }
1155            (None, Some(params)) => {
1156                // Validate a required field pair: exactly one must be set.
1157                fn validate_required_field(
1158                    field: &str,
1159                    literal: &Option<String>,
1160                    secret: &Option<SecretKeySelector>,
1161                ) -> Result<(), ConnectionValidationError> {
1162                    match (literal, secret) {
1163                        (Some(_), Some(_)) => {
1164                            return Err(ConnectionValidationError::BothFieldsSet {
1165                                field: field.to_string(),
1166                            });
1167                        }
1168                        (None, None) => {
1169                            return Err(ConnectionValidationError::NeitherFieldSet {
1170                                field: field.to_string(),
1171                            });
1172                        }
1173                        (Some(s), None) => {
1174                            if s.trim().is_empty() {
1175                                return Err(ConnectionValidationError::EmptyLiteral {
1176                                    field: field.to_string(),
1177                                });
1178                            }
1179                        }
1180                        (None, Some(sel)) => {
1181                            validate_secret_selector(field, sel)?;
1182                        }
1183                    }
1184                    Ok(())
1185                }
1186
1187                // Validate an optional field pair: at most one may be set.
1188                fn validate_optional_field(
1189                    field: &str,
1190                    literal: &Option<impl AsRef<str>>,
1191                    secret: &Option<SecretKeySelector>,
1192                ) -> Result<(), ConnectionValidationError> {
1193                    let has_literal = literal.is_some();
1194                    if has_literal && secret.is_some() {
1195                        return Err(ConnectionValidationError::BothFieldsSet {
1196                            field: field.to_string(),
1197                        });
1198                    }
1199                    if let Some(s) = literal
1200                        && s.as_ref().trim().is_empty()
1201                    {
1202                        return Err(ConnectionValidationError::EmptyLiteral {
1203                            field: field.to_string(),
1204                        });
1205                    }
1206                    if let Some(sel) = secret {
1207                        validate_secret_selector(field, sel)?;
1208                    }
1209                    Ok(())
1210                }
1211
1212                fn validate_secret_selector(
1213                    field: &str,
1214                    sel: &SecretKeySelector,
1215                ) -> Result<(), ConnectionValidationError> {
1216                    if sel.name.trim().is_empty() {
1217                        return Err(ConnectionValidationError::EmptySecretKeyRef {
1218                            field: field.to_string(),
1219                            detail: "name must not be empty".to_string(),
1220                        });
1221                    }
1222                    if sel.key.trim().is_empty() {
1223                        return Err(ConnectionValidationError::EmptySecretKeyRef {
1224                            field: field.to_string(),
1225                            detail: "key must not be empty".to_string(),
1226                        });
1227                    }
1228                    Ok(())
1229                }
1230
1231                // Required fields: host, dbname, username. Password is only
1232                // required for static-password auth.
1233                validate_required_field("host", &params.host, &params.host_secret)?;
1234                validate_required_field("dbname", &params.dbname, &params.dbname_secret)?;
1235                validate_required_field("username", &params.username, &params.username_secret)?;
1236                if let Some(auth) = &params.auth {
1237                    if params.password.is_some() || params.password_secret.is_some() {
1238                        return Err(ConnectionValidationError::AuthWithPassword);
1239                    }
1240                    match auth {
1241                        ConnectionAuth::GcpWorkloadIdentity {
1242                            impersonate_service_account,
1243                            scope,
1244                        } => {
1245                            if let Some(value) = impersonate_service_account
1246                                && value.trim().is_empty()
1247                            {
1248                                return Err(ConnectionValidationError::EmptyAuthField {
1249                                    field: "impersonateServiceAccount".to_string(),
1250                                });
1251                            }
1252                            if let Some(value) = scope
1253                                && value.trim().is_empty()
1254                            {
1255                                return Err(ConnectionValidationError::EmptyAuthField {
1256                                    field: "scope".to_string(),
1257                                });
1258                            }
1259                        }
1260                    }
1261                } else {
1262                    validate_required_field("password", &params.password, &params.password_secret)?;
1263                }
1264
1265                // Optional fields: port, sslMode.
1266                // Port is u16 so we wrap it for the generic check.
1267                let port_str = params.port.map(|p| p.to_string());
1268                validate_optional_field("port", &port_str, &params.port_secret)?;
1269
1270                validate_optional_field("sslMode", &params.ssl_mode, &params.ssl_mode_secret)?;
1271
1272                // Validate sslMode value if it's a literal.
1273                if let Some(value) = &params.ssl_mode
1274                    && !VALID_SSL_MODES.contains(&value.as_str())
1275                {
1276                    return Err(ConnectionValidationError::InvalidSslMode {
1277                        value: value.clone(),
1278                    });
1279                }
1280
1281                Ok(())
1282            }
1283            (Some(_), Some(_)) => Err(ConnectionValidationError::BothModesSet),
1284            (None, None) => Err(ConnectionValidationError::NeitherModeSet),
1285        }
1286    }
1287
1288    /// All Kubernetes Secret names referenced by this spec.
1289    ///
1290    /// Includes the connection Secret, password `secretRef` Secrets, and
1291    /// generated password Secrets. Used by the controller to trigger
1292    /// reconciliation when any of these Secrets change (or are deleted).
1293    pub fn referenced_secret_names(&self, policy_name: &str) -> BTreeSet<String> {
1294        let mut names = BTreeSet::new();
1295        // Connection secrets — either URL mode or structured params.
1296        self.connection.collect_secret_names(&mut names);
1297        for role in &self.roles {
1298            if let Some(pw) = &role.password {
1299                if let Some(secret_ref) = &pw.secret_ref {
1300                    names.insert(secret_ref.name.clone());
1301                }
1302                if let Some(gen_spec) = &pw.generate {
1303                    let secret_name =
1304                        crate::password::generated_secret_name(policy_name, &role.name, gen_spec);
1305                    names.insert(secret_name);
1306                }
1307            }
1308        }
1309        names
1310    }
1311}
1312
1313// ---------------------------------------------------------------------------
1314// Conversion: CRD spec → core manifest types
1315// ---------------------------------------------------------------------------
1316
1317impl PostgresPolicySpec {
1318    /// Convert the CRD spec into a `PolicyManifest` for use with the core library.
1319    pub fn to_policy_manifest(&self) -> pgroles_core::manifest::PolicyManifest {
1320        use pgroles_core::manifest::{
1321            DefaultPrivilegeGrant, MemberSpec, PolicyManifest, Profile, ProfileGrant,
1322            ProfileObjectTarget, RoleDefinition,
1323        };
1324
1325        let profiles = self
1326            .profiles
1327            .iter()
1328            .map(|(name, spec)| {
1329                let profile = Profile {
1330                    login: spec.login,
1331                    inherit: spec.inherit,
1332                    grants: spec
1333                        .grants
1334                        .iter()
1335                        .map(|g| ProfileGrant {
1336                            privileges: g.privileges.clone(),
1337                            object: ProfileObjectTarget {
1338                                object_type: g.object.object_type,
1339                                name: g.object.name.clone(),
1340                            },
1341                        })
1342                        .collect(),
1343                    default_privileges: spec
1344                        .default_privileges
1345                        .iter()
1346                        .map(|dp| DefaultPrivilegeGrant {
1347                            role: dp.role.clone(),
1348                            privileges: dp.privileges.clone(),
1349                            on_type: dp.on_type,
1350                        })
1351                        .collect(),
1352                };
1353                (name.clone(), profile)
1354            })
1355            .collect();
1356
1357        let roles = self
1358            .roles
1359            .iter()
1360            .map(|r| RoleDefinition {
1361                name: r.name.clone(),
1362                login: r.login,
1363                superuser: r.superuser,
1364                createdb: r.createdb,
1365                createrole: r.createrole,
1366                inherit: r.inherit,
1367                replication: r.replication,
1368                bypassrls: r.bypassrls,
1369                connection_limit: r.connection_limit,
1370                comment: r.comment.clone(),
1371                password: None, // K8s passwords are resolved separately via Secret refs
1372                password_valid_until: r.password_valid_until.clone(),
1373            })
1374            .collect();
1375
1376        // Memberships need MemberSpec conversion — the core type should
1377        // already be compatible since we use it directly in the CRD spec.
1378        // But we need to ensure the serde aliases work. Let's rebuild to be safe.
1379        let memberships = self
1380            .memberships
1381            .iter()
1382            .map(|m| pgroles_core::manifest::Membership {
1383                role: m.role.clone(),
1384                members: m
1385                    .members
1386                    .iter()
1387                    .map(|ms| MemberSpec {
1388                        name: ms.name.clone(),
1389                        inherit: ms.inherit,
1390                        admin: ms.admin,
1391                    })
1392                    .collect(),
1393            })
1394            .collect();
1395
1396        PolicyManifest {
1397            default_owner: self.default_owner.clone(),
1398            auth_providers: Vec::new(),
1399            profiles,
1400            schemas: self.schemas.clone(),
1401            roles,
1402            grants: self.grants.clone(),
1403            default_privileges: self.default_privileges.clone(),
1404            memberships,
1405            retirements: self.retirements.clone(),
1406        }
1407    }
1408
1409    /// Derive a conservative ownership claim set from the policy spec.
1410    ///
1411    /// This intentionally claims all declared/expanded roles and all referenced
1412    /// schemas so overlapping policies are rejected safely.
1413    pub fn ownership_claims(
1414        &self,
1415    ) -> Result<OwnershipClaims, pgroles_core::manifest::ManifestError> {
1416        let manifest = self.to_policy_manifest();
1417        let expanded = pgroles_core::manifest::expand_manifest(&manifest)?;
1418
1419        let mut roles: BTreeSet<String> = expanded.roles.into_iter().map(|r| r.name).collect();
1420        let mut schemas: BTreeSet<String> = self.schemas.iter().map(|s| s.name.clone()).collect();
1421
1422        roles.extend(manifest.retirements.into_iter().map(|r| r.role));
1423        roles.extend(manifest.grants.iter().map(|g| g.role.clone()));
1424        roles.extend(
1425            manifest
1426                .default_privileges
1427                .iter()
1428                .flat_map(|dp| dp.grant.iter().filter_map(|grant| grant.role.clone())),
1429        );
1430        roles.extend(manifest.memberships.iter().map(|m| m.role.clone()));
1431        roles.extend(
1432            manifest
1433                .memberships
1434                .iter()
1435                .flat_map(|m| m.members.iter().map(|member| member.name.clone())),
1436        );
1437
1438        schemas.extend(
1439            manifest
1440                .grants
1441                .iter()
1442                .filter_map(|g| match g.object.object_type {
1443                    ObjectType::Database => None,
1444                    ObjectType::Schema => g.object.name.clone(),
1445                    _ => g.object.schema.clone(),
1446                }),
1447        );
1448        schemas.extend(
1449            manifest
1450                .default_privileges
1451                .iter()
1452                .map(|dp| dp.schema.clone()),
1453        );
1454
1455        Ok(OwnershipClaims { roles, schemas })
1456    }
1457}
1458
1459// ---------------------------------------------------------------------------
1460// Status helpers
1461// ---------------------------------------------------------------------------
1462
1463impl PostgresPolicyStatus {
1464    /// Set a condition, replacing any existing condition of the same type.
1465    ///
1466    /// If the condition's `status` value has not changed, the existing
1467    /// `last_transition_time` is preserved (per Kubernetes condition conventions).
1468    pub fn set_condition(&mut self, new: PolicyCondition) {
1469        if let Some(existing) = self
1470            .conditions
1471            .iter()
1472            .find(|c| c.condition_type == new.condition_type)
1473            && existing.status == new.status
1474        {
1475            // Status unchanged — preserve the existing transition time.
1476            let mut updated = new;
1477            updated.last_transition_time = existing.last_transition_time.clone();
1478            self.conditions
1479                .retain(|c| c.condition_type != updated.condition_type);
1480            self.conditions.push(updated);
1481            return;
1482        }
1483        // New condition or status changed — use the new timestamp.
1484        self.conditions
1485            .retain(|c| c.condition_type != new.condition_type);
1486        self.conditions.push(new);
1487    }
1488}
1489
1490/// Create a timestamp string in ISO 8601 / RFC 3339 format.
1491pub fn now_rfc3339() -> String {
1492    // Use k8s-openapi's chrono re-export or manual formatting.
1493    // For simplicity, use the system time.
1494    use std::time::SystemTime;
1495    let now = SystemTime::now()
1496        .duration_since(SystemTime::UNIX_EPOCH)
1497        .unwrap_or_default();
1498    // Format as simplified ISO 8601
1499    let secs = now.as_secs();
1500    let days = secs / 86400;
1501    let remaining = secs % 86400;
1502    let hours = remaining / 3600;
1503    let minutes = (remaining % 3600) / 60;
1504    let seconds = remaining % 60;
1505
1506    // Convert days since epoch to date (simplified — good enough for status)
1507    let (year, month, day) = days_to_date(days);
1508    format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
1509}
1510
1511/// Convert days since Unix epoch to (year, month, day).
1512pub fn days_to_date(days_since_epoch: u64) -> (u64, u64, u64) {
1513    // Civil calendar algorithm from Howard Hinnant
1514    let z = days_since_epoch + 719468;
1515    let era = z / 146097;
1516    let doe = z - era * 146097;
1517    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1518    let y = yoe + era * 400;
1519    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1520    let mp = (5 * doy + 2) / 153;
1521    let d = doy - (153 * mp + 2) / 5 + 1;
1522    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1523    let y = if m <= 2 { y + 1 } else { y };
1524    (y, m, d)
1525}
1526
1527/// Helper to create a "Ready" condition.
1528pub fn ready_condition(status: bool, reason: &str, message: &str) -> PolicyCondition {
1529    PolicyCondition {
1530        condition_type: "Ready".to_string(),
1531        status: if status { "True" } else { "False" }.to_string(),
1532        reason: Some(reason.to_string()),
1533        message: Some(message.to_string()),
1534        last_transition_time: Some(now_rfc3339()),
1535    }
1536}
1537
1538/// Helper to create a "Reconciling" condition.
1539pub fn reconciling_condition(message: &str) -> PolicyCondition {
1540    PolicyCondition {
1541        condition_type: "Reconciling".to_string(),
1542        status: "True".to_string(),
1543        reason: Some("Reconciling".to_string()),
1544        message: Some(message.to_string()),
1545        last_transition_time: Some(now_rfc3339()),
1546    }
1547}
1548
1549/// Helper to create a "Degraded" condition.
1550pub fn degraded_condition(reason: &str, message: &str) -> PolicyCondition {
1551    PolicyCondition {
1552        condition_type: "Degraded".to_string(),
1553        status: "True".to_string(),
1554        reason: Some(reason.to_string()),
1555        message: Some(message.to_string()),
1556        last_transition_time: Some(now_rfc3339()),
1557    }
1558}
1559
1560/// Helper to create a "Paused" condition.
1561pub fn paused_condition(message: &str) -> PolicyCondition {
1562    PolicyCondition {
1563        condition_type: "Paused".to_string(),
1564        status: "True".to_string(),
1565        reason: Some("Suspended".to_string()),
1566        message: Some(message.to_string()),
1567        last_transition_time: Some(now_rfc3339()),
1568    }
1569}
1570
1571/// Helper to create a "Conflict" condition.
1572pub fn conflict_condition(reason: &str, message: &str) -> PolicyCondition {
1573    PolicyCondition {
1574        condition_type: "Conflict".to_string(),
1575        status: "True".to_string(),
1576        reason: Some(reason.to_string()),
1577        message: Some(message.to_string()),
1578        last_transition_time: Some(now_rfc3339()),
1579    }
1580}
1581
1582/// Helper to create a "Drifted" condition.
1583pub fn drifted_condition(status: bool, reason: &str, message: &str) -> PolicyCondition {
1584    PolicyCondition {
1585        condition_type: "Drifted".to_string(),
1586        status: if status { "True" } else { "False" }.to_string(),
1587        reason: Some(reason.to_string()),
1588        message: Some(message.to_string()),
1589        last_transition_time: Some(now_rfc3339()),
1590    }
1591}
1592
1593// ---------------------------------------------------------------------------
1594// Tests
1595// ---------------------------------------------------------------------------
1596
1597#[cfg(test)]
1598mod tests {
1599    use super::*;
1600    use kube::CustomResourceExt;
1601
1602    #[test]
1603    fn crd_generates_valid_schema() {
1604        let crd = PostgresPolicy::crd();
1605        let yaml = serde_yaml::to_string(&crd).expect("CRD should serialize to YAML");
1606        assert!(yaml.contains("pgroles.io"), "group should be pgroles.io");
1607        assert!(yaml.contains("v1alpha1"), "version should be v1alpha1");
1608        assert!(
1609            yaml.contains("PostgresPolicy"),
1610            "kind should be PostgresPolicy"
1611        );
1612        assert!(
1613            yaml.contains("\"mode\"") || yaml.contains(" mode:"),
1614            "schema should declare spec.mode"
1615        );
1616        assert!(
1617            yaml.contains("\"object\"") || yaml.contains(" object:"),
1618            "schema should declare grant object targets using object"
1619        );
1620    }
1621
1622    #[test]
1623    fn spec_to_policy_manifest_roundtrip() {
1624        let spec = PostgresPolicySpec {
1625            connection: ConnectionSpec {
1626                secret_ref: Some(SecretReference {
1627                    name: "pg-secret".to_string(),
1628                }),
1629                secret_key: Some("DATABASE_URL".to_string()),
1630                params: None,
1631            },
1632            interval: "5m".to_string(),
1633            suspend: false,
1634            mode: PolicyMode::Apply,
1635            reconciliation_mode: CrdReconciliationMode::default(),
1636            default_owner: Some("app_owner".to_string()),
1637            profiles: std::collections::HashMap::new(),
1638            schemas: vec![],
1639            roles: vec![RoleSpec {
1640                name: "analytics".to_string(),
1641                login: Some(true),
1642                superuser: None,
1643                createdb: None,
1644                createrole: None,
1645                inherit: None,
1646                replication: None,
1647                bypassrls: None,
1648                connection_limit: None,
1649                comment: Some("test role".to_string()),
1650                password: None,
1651                password_valid_until: None,
1652            }],
1653            grants: vec![],
1654            default_privileges: vec![],
1655            memberships: vec![],
1656            retirements: vec![RoleRetirement {
1657                role: "legacy-app".to_string(),
1658                reassign_owned_to: Some("app_owner".to_string()),
1659                drop_owned: true,
1660                terminate_sessions: true,
1661            }],
1662            approval: None,
1663        };
1664
1665        let manifest = spec.to_policy_manifest();
1666        assert_eq!(manifest.default_owner, Some("app_owner".to_string()));
1667        assert_eq!(manifest.roles.len(), 1);
1668        assert_eq!(manifest.roles[0].name, "analytics");
1669        assert_eq!(manifest.roles[0].login, Some(true));
1670        assert_eq!(manifest.roles[0].comment, Some("test role".to_string()));
1671        assert_eq!(manifest.retirements.len(), 1);
1672        assert_eq!(manifest.retirements[0].role, "legacy-app");
1673        assert_eq!(
1674            manifest.retirements[0].reassign_owned_to.as_deref(),
1675            Some("app_owner")
1676        );
1677        assert!(manifest.retirements[0].drop_owned);
1678        assert!(manifest.retirements[0].terminate_sessions);
1679    }
1680
1681    #[test]
1682    fn spec_to_policy_manifest_preserves_profile_inherit() {
1683        let spec = PostgresPolicySpec {
1684            connection: ConnectionSpec {
1685                secret_ref: Some(SecretReference {
1686                    name: "pg-secret".to_string(),
1687                }),
1688                secret_key: Some("DATABASE_URL".to_string()),
1689                params: None,
1690            },
1691            interval: "5m".to_string(),
1692            suspend: false,
1693            mode: PolicyMode::Apply,
1694            reconciliation_mode: CrdReconciliationMode::default(),
1695            default_owner: None,
1696            profiles: std::collections::HashMap::from([(
1697                "editor".to_string(),
1698                ProfileSpec {
1699                    login: Some(false),
1700                    inherit: Some(false),
1701                    grants: vec![],
1702                    default_privileges: vec![],
1703                },
1704            )]),
1705            schemas: vec![],
1706            roles: vec![],
1707            grants: vec![],
1708            default_privileges: vec![],
1709            memberships: vec![],
1710            retirements: vec![],
1711            approval: None,
1712        };
1713
1714        let manifest = spec.to_policy_manifest();
1715        assert_eq!(manifest.profiles["editor"].login, Some(false));
1716        assert_eq!(manifest.profiles["editor"].inherit, Some(false));
1717    }
1718
1719    #[test]
1720    fn status_set_condition_replaces_existing() {
1721        let mut status = PostgresPolicyStatus::default();
1722
1723        status.set_condition(ready_condition(false, "Pending", "Initial"));
1724        assert_eq!(status.conditions.len(), 1);
1725        assert_eq!(status.conditions[0].status, "False");
1726
1727        status.set_condition(ready_condition(true, "Reconciled", "All good"));
1728        assert_eq!(status.conditions.len(), 1);
1729        assert_eq!(status.conditions[0].status, "True");
1730        assert_eq!(status.conditions[0].reason.as_deref(), Some("Reconciled"));
1731    }
1732
1733    #[test]
1734    fn status_set_condition_adds_new_type() {
1735        let mut status = PostgresPolicyStatus::default();
1736
1737        status.set_condition(ready_condition(true, "OK", "ready"));
1738        status.set_condition(degraded_condition("Error", "something broke"));
1739
1740        assert_eq!(status.conditions.len(), 2);
1741    }
1742
1743    #[test]
1744    fn paused_condition_has_expected_shape() {
1745        let paused = paused_condition("paused by spec");
1746        assert_eq!(paused.condition_type, "Paused");
1747        assert_eq!(paused.status, "True");
1748        assert_eq!(paused.reason.as_deref(), Some("Suspended"));
1749    }
1750
1751    #[test]
1752    fn ownership_claims_include_expanded_roles_and_schemas() {
1753        let mut profiles = std::collections::HashMap::new();
1754        profiles.insert(
1755            "editor".to_string(),
1756            ProfileSpec {
1757                login: Some(false),
1758                inherit: None,
1759                grants: vec![],
1760                default_privileges: vec![],
1761            },
1762        );
1763
1764        let spec = PostgresPolicySpec {
1765            connection: ConnectionSpec {
1766                secret_ref: Some(SecretReference {
1767                    name: "pg-secret".to_string(),
1768                }),
1769                secret_key: Some("DATABASE_URL".to_string()),
1770                params: None,
1771            },
1772            interval: "5m".to_string(),
1773            suspend: false,
1774            mode: PolicyMode::Apply,
1775            reconciliation_mode: CrdReconciliationMode::default(),
1776            default_owner: None,
1777            profiles,
1778            schemas: vec![SchemaBinding {
1779                name: "inventory".to_string(),
1780                profiles: vec!["editor".to_string()],
1781                role_pattern: "{schema}-{profile}".to_string(),
1782                owner: None,
1783            }],
1784            roles: vec![RoleSpec {
1785                name: "app-service".to_string(),
1786                login: Some(true),
1787                superuser: None,
1788                createdb: None,
1789                createrole: None,
1790                inherit: None,
1791                replication: None,
1792                bypassrls: None,
1793                connection_limit: None,
1794                comment: None,
1795                password: None,
1796                password_valid_until: None,
1797            }],
1798            grants: vec![],
1799            default_privileges: vec![],
1800            memberships: vec![],
1801            retirements: vec![RoleRetirement {
1802                role: "legacy-app".to_string(),
1803                reassign_owned_to: None,
1804                drop_owned: false,
1805                terminate_sessions: false,
1806            }],
1807            approval: None,
1808        };
1809
1810        let claims = spec.ownership_claims().unwrap();
1811        assert!(claims.roles.contains("inventory-editor"));
1812        assert!(claims.roles.contains("app-service"));
1813        assert!(claims.roles.contains("legacy-app"));
1814        assert!(claims.schemas.contains("inventory"));
1815    }
1816
1817    #[test]
1818    fn ownership_overlap_summary_reports_roles_and_schemas() {
1819        let mut left = OwnershipClaims::default();
1820        left.roles.insert("analytics".to_string());
1821        left.schemas.insert("reporting".to_string());
1822
1823        let mut right = OwnershipClaims::default();
1824        right.roles.insert("analytics".to_string());
1825        right.schemas.insert("reporting".to_string());
1826        right.schemas.insert("other".to_string());
1827
1828        assert!(left.overlaps(&right));
1829        let summary = left.overlap_summary(&right);
1830        assert!(summary.contains("roles: analytics"));
1831        assert!(summary.contains("schemas: reporting"));
1832    }
1833
1834    #[test]
1835    fn database_identity_uses_namespace_and_identity_key() {
1836        let conn = ConnectionSpec {
1837            secret_ref: Some(SecretReference {
1838                name: "db-creds".to_string(),
1839            }),
1840            secret_key: Some("DATABASE_URL".to_string()),
1841            params: None,
1842        };
1843        let identity = DatabaseIdentity::from_connection("prod", &conn);
1844        assert_eq!(identity.as_str(), "prod/db-creds/DATABASE_URL");
1845    }
1846
1847    #[test]
1848    fn identity_key_same_database_different_users_are_equal() {
1849        // Two policies targeting the same database but with different users
1850        // should have the SAME identity key (for locking/conflict detection).
1851        let user_a = ConnectionSpec {
1852            secret_ref: None,
1853            secret_key: None,
1854            params: Some(ConnectionParams {
1855                host: Some("my-host".into()),
1856                host_secret: None,
1857                port: None,
1858                port_secret: None,
1859                dbname: Some("mydb".into()),
1860                dbname_secret: None,
1861                username: Some("alice".into()),
1862                username_secret: None,
1863                password: Some("pass-a".into()),
1864                password_secret: None,
1865                auth: None,
1866                ssl_mode: None,
1867                ssl_mode_secret: None,
1868            }),
1869        };
1870        let user_b = ConnectionSpec {
1871            secret_ref: None,
1872            secret_key: None,
1873            params: Some(ConnectionParams {
1874                host: Some("my-host".into()),
1875                host_secret: None,
1876                port: None,
1877                port_secret: None,
1878                dbname: Some("mydb".into()),
1879                dbname_secret: None,
1880                username: Some("bob".into()),
1881                username_secret: None,
1882                password: Some("pass-b".into()),
1883                password_secret: None,
1884                auth: None,
1885                ssl_mode: None,
1886                ssl_mode_secret: None,
1887            }),
1888        };
1889
1890        assert_eq!(
1891            user_a.identity_key(),
1892            user_b.identity_key(),
1893            "same database with different users should have the same identity key"
1894        );
1895        // But cache keys should differ (different credentials = different pool).
1896        assert_ne!(
1897            user_a.cache_key("default"),
1898            user_b.cache_key("default"),
1899            "different credentials should produce different cache keys"
1900        );
1901    }
1902
1903    #[test]
1904    fn cache_key_no_collision_between_literal_and_secret_username() {
1905        // A literal username containing "secret=" should not collide with a
1906        // real secret reference in the cache key.
1907        let literal_conn = ConnectionSpec {
1908            secret_ref: None,
1909            secret_key: None,
1910            params: Some(ConnectionParams {
1911                host: Some("my-host".into()),
1912                host_secret: None,
1913                port: None,
1914                port_secret: None,
1915                dbname: Some("mydb".into()),
1916                dbname_secret: None,
1917                username: Some("secret=creds\0password".into()),
1918                username_secret: None,
1919                password: Some("pass".into()),
1920                password_secret: None,
1921                auth: None,
1922                ssl_mode: None,
1923                ssl_mode_secret: None,
1924            }),
1925        };
1926        let secret_conn = ConnectionSpec {
1927            secret_ref: None,
1928            secret_key: None,
1929            params: Some(ConnectionParams {
1930                host: Some("my-host".into()),
1931                host_secret: None,
1932                port: None,
1933                port_secret: None,
1934                dbname: Some("mydb".into()),
1935                dbname_secret: None,
1936                username: None,
1937                username_secret: Some(SecretKeySelector {
1938                    name: "creds".into(),
1939                    key: "password".into(),
1940                }),
1941                password: Some("pass".into()),
1942                password_secret: None,
1943                auth: None,
1944                ssl_mode: None,
1945                ssl_mode_secret: None,
1946            }),
1947        };
1948
1949        assert_ne!(
1950            literal_conn.cache_key("default"),
1951            secret_conn.cache_key("default"),
1952            "literal and secret ref should produce different cache keys"
1953        );
1954    }
1955
1956    #[test]
1957    fn cache_key_includes_ssl_mode() {
1958        let conn_no_ssl = ConnectionSpec {
1959            secret_ref: None,
1960            secret_key: None,
1961            params: Some(ConnectionParams {
1962                host: Some("host".into()),
1963                host_secret: None,
1964                port: None,
1965                port_secret: None,
1966                dbname: Some("db".into()),
1967                dbname_secret: None,
1968                username: Some("user".into()),
1969                username_secret: None,
1970                password: Some("pass".into()),
1971                password_secret: None,
1972                auth: None,
1973                ssl_mode: None,
1974                ssl_mode_secret: None,
1975            }),
1976        };
1977        let conn_with_ssl = ConnectionSpec {
1978            secret_ref: None,
1979            secret_key: None,
1980            params: Some(ConnectionParams {
1981                host: Some("host".into()),
1982                host_secret: None,
1983                port: None,
1984                port_secret: None,
1985                dbname: Some("db".into()),
1986                dbname_secret: None,
1987                username: Some("user".into()),
1988                username_secret: None,
1989                password: Some("pass".into()),
1990                password_secret: None,
1991                auth: None,
1992                ssl_mode: Some("require".into()),
1993                ssl_mode_secret: None,
1994            }),
1995        };
1996
1997        assert_ne!(
1998            conn_no_ssl.cache_key("ns"),
1999            conn_with_ssl.cache_key("ns"),
2000            "cache key should differ when sslMode is present"
2001        );
2002    }
2003
2004    #[test]
2005    fn validate_connection_rejects_empty_literal_host() {
2006        let spec = spec_with_connection(ConnectionSpec {
2007            secret_ref: None,
2008            secret_key: None,
2009            params: Some(ConnectionParams {
2010                host: Some("".into()),
2011                host_secret: None,
2012                port: None,
2013                port_secret: None,
2014                dbname: Some("mydb".into()),
2015                dbname_secret: None,
2016                username: Some("user".into()),
2017                username_secret: None,
2018                password: Some("pass".into()),
2019                password_secret: None,
2020                auth: None,
2021                ssl_mode: None,
2022                ssl_mode_secret: None,
2023            }),
2024        });
2025
2026        let err = spec.validate_connection_spec().unwrap_err();
2027        assert!(
2028            matches!(err, ConnectionValidationError::EmptyLiteral { ref field } if field == "host"),
2029            "expected EmptyLiteral for host, got: {err}"
2030        );
2031    }
2032
2033    #[test]
2034    fn validate_connection_rejects_whitespace_literal_dbname() {
2035        let spec = spec_with_connection(ConnectionSpec {
2036            secret_ref: None,
2037            secret_key: None,
2038            params: Some(ConnectionParams {
2039                host: Some("host".into()),
2040                host_secret: None,
2041                port: None,
2042                port_secret: None,
2043                dbname: Some("  ".into()),
2044                dbname_secret: None,
2045                username: Some("user".into()),
2046                username_secret: None,
2047                password: Some("pass".into()),
2048                password_secret: None,
2049                auth: None,
2050                ssl_mode: None,
2051                ssl_mode_secret: None,
2052            }),
2053        });
2054
2055        let err = spec.validate_connection_spec().unwrap_err();
2056        assert!(
2057            matches!(err, ConnectionValidationError::EmptyLiteral { ref field } if field == "dbname"),
2058            "expected EmptyLiteral for dbname, got: {err}"
2059        );
2060    }
2061
2062    /// Helper to build a minimal spec with the given connection and no roles/grants.
2063    fn spec_with_connection(connection: ConnectionSpec) -> PostgresPolicySpec {
2064        PostgresPolicySpec {
2065            connection,
2066            interval: "5m".into(),
2067            suspend: false,
2068            mode: PolicyMode::Apply,
2069            reconciliation_mode: CrdReconciliationMode::default(),
2070            default_owner: None,
2071            profiles: Default::default(),
2072            schemas: vec![],
2073            roles: vec![],
2074            grants: vec![],
2075            default_privileges: vec![],
2076            memberships: vec![],
2077            retirements: vec![],
2078            approval: None,
2079        }
2080    }
2081
2082    fn url_mode_connection() -> ConnectionSpec {
2083        ConnectionSpec {
2084            secret_ref: Some(SecretReference {
2085                name: "pg-creds".into(),
2086            }),
2087            secret_key: Some("DATABASE_URL".into()),
2088            params: None,
2089        }
2090    }
2091
2092    fn params_mode_connection() -> ConnectionSpec {
2093        ConnectionSpec {
2094            secret_ref: None,
2095            secret_key: None,
2096            params: Some(ConnectionParams {
2097                host: Some("my-postgres".into()),
2098                host_secret: None,
2099                port: None,
2100                port_secret: None,
2101                dbname: Some("mydb".into()),
2102                dbname_secret: None,
2103                username: None,
2104                username_secret: Some(SecretKeySelector {
2105                    name: "pg-creds".into(),
2106                    key: "username".into(),
2107                }),
2108                password: None,
2109                password_secret: Some(SecretKeySelector {
2110                    name: "pg-creds".into(),
2111                    key: "password".into(),
2112                }),
2113                auth: None,
2114                ssl_mode: None,
2115                ssl_mode_secret: None,
2116            }),
2117        }
2118    }
2119
2120    // -- Connection validation tests -----------------------------------------
2121
2122    #[test]
2123    fn validate_connection_accepts_url_mode() {
2124        let spec = spec_with_connection(url_mode_connection());
2125        assert!(spec.validate_connection_spec().is_ok());
2126    }
2127
2128    #[test]
2129    fn validate_connection_accepts_params_mode() {
2130        let spec = spec_with_connection(params_mode_connection());
2131        assert!(spec.validate_connection_spec().is_ok());
2132    }
2133
2134    #[test]
2135    fn validate_connection_rejects_both_modes_set() {
2136        let spec = spec_with_connection(ConnectionSpec {
2137            secret_ref: Some(SecretReference {
2138                name: "pg-creds".into(),
2139            }),
2140            secret_key: None,
2141            params: Some(ConnectionParams {
2142                host: Some("host".into()),
2143                host_secret: None,
2144                port: None,
2145                port_secret: None,
2146                dbname: Some("db".into()),
2147                dbname_secret: None,
2148                username: Some("user".into()),
2149                username_secret: None,
2150                password: Some("pass".into()),
2151                password_secret: None,
2152                auth: None,
2153                ssl_mode: None,
2154                ssl_mode_secret: None,
2155            }),
2156        });
2157        assert!(matches!(
2158            spec.validate_connection_spec(),
2159            Err(ConnectionValidationError::BothModesSet)
2160        ));
2161    }
2162
2163    #[test]
2164    fn validate_connection_rejects_neither_mode_set() {
2165        let spec = spec_with_connection(ConnectionSpec {
2166            secret_ref: None,
2167            secret_key: None,
2168            params: None,
2169        });
2170        assert!(spec.validate_connection_spec().is_err());
2171    }
2172
2173    #[test]
2174    fn validate_connection_rejects_invalid_ssl_mode() {
2175        let spec = spec_with_connection(ConnectionSpec {
2176            secret_ref: None,
2177            secret_key: None,
2178            params: Some(ConnectionParams {
2179                host: Some("host".into()),
2180                host_secret: None,
2181                port: None,
2182                port_secret: None,
2183                dbname: Some("db".into()),
2184                dbname_secret: None,
2185                username: Some("user".into()),
2186                username_secret: None,
2187                password: Some("pass".into()),
2188                password_secret: None,
2189                auth: None,
2190                ssl_mode: Some("invalid-mode".into()),
2191                ssl_mode_secret: None,
2192            }),
2193        });
2194        assert!(spec.validate_connection_spec().is_err());
2195    }
2196
2197    #[test]
2198    fn validate_connection_accepts_gcp_workload_identity_without_password() {
2199        let spec = spec_with_connection(ConnectionSpec {
2200            secret_ref: None,
2201            secret_key: None,
2202            params: Some(ConnectionParams {
2203                host: Some("10.0.0.5".into()),
2204                host_secret: None,
2205                port: None,
2206                port_secret: None,
2207                dbname: Some("discovery".into()),
2208                dbname_secret: None,
2209                username: Some("pgroles-operator@my-project.iam".into()),
2210                username_secret: None,
2211                password: None,
2212                password_secret: None,
2213                auth: Some(ConnectionAuth::GcpWorkloadIdentity {
2214                    impersonate_service_account: None,
2215                    scope: None,
2216                }),
2217                ssl_mode: None,
2218                ssl_mode_secret: None,
2219            }),
2220        });
2221
2222        assert!(spec.validate_connection_spec().is_ok());
2223        assert!(spec.referenced_secret_names("policy").is_empty());
2224    }
2225
2226    #[test]
2227    fn validate_connection_rejects_gcp_workload_identity_with_password() {
2228        let spec = spec_with_connection(ConnectionSpec {
2229            secret_ref: None,
2230            secret_key: None,
2231            params: Some(ConnectionParams {
2232                host: Some("10.0.0.5".into()),
2233                host_secret: None,
2234                port: None,
2235                port_secret: None,
2236                dbname: Some("discovery".into()),
2237                dbname_secret: None,
2238                username: Some("pgroles-operator@my-project.iam".into()),
2239                username_secret: None,
2240                password: Some("static-password".into()),
2241                password_secret: None,
2242                auth: Some(ConnectionAuth::GcpWorkloadIdentity {
2243                    impersonate_service_account: None,
2244                    scope: None,
2245                }),
2246                ssl_mode: None,
2247                ssl_mode_secret: None,
2248            }),
2249        });
2250
2251        assert!(matches!(
2252            spec.validate_connection_spec(),
2253            Err(ConnectionValidationError::AuthWithPassword)
2254        ));
2255    }
2256
2257    #[test]
2258    fn validate_connection_rejects_empty_gcp_auth_fields() {
2259        let spec = spec_with_connection(ConnectionSpec {
2260            secret_ref: None,
2261            secret_key: None,
2262            params: Some(ConnectionParams {
2263                host: Some("10.0.0.5".into()),
2264                host_secret: None,
2265                port: None,
2266                port_secret: None,
2267                dbname: Some("discovery".into()),
2268                dbname_secret: None,
2269                username: Some("pgroles-operator@my-project.iam".into()),
2270                username_secret: None,
2271                password: None,
2272                password_secret: None,
2273                auth: Some(ConnectionAuth::GcpWorkloadIdentity {
2274                    impersonate_service_account: Some(" ".into()),
2275                    scope: None,
2276                }),
2277                ssl_mode: None,
2278                ssl_mode_secret: None,
2279            }),
2280        });
2281
2282        assert!(matches!(
2283            spec.validate_connection_spec(),
2284            Err(ConnectionValidationError::EmptyAuthField { ref field })
2285                if field == "impersonateServiceAccount"
2286        ));
2287    }
2288
2289    #[test]
2290    fn validate_connection_accepts_valid_ssl_modes() {
2291        for mode in &[
2292            "disable",
2293            "allow",
2294            "prefer",
2295            "require",
2296            "verify-ca",
2297            "verify-full",
2298        ] {
2299            let spec = spec_with_connection(ConnectionSpec {
2300                secret_ref: None,
2301                secret_key: None,
2302                params: Some(ConnectionParams {
2303                    host: Some("host".into()),
2304                    host_secret: None,
2305                    port: None,
2306                    port_secret: None,
2307                    dbname: Some("db".into()),
2308                    dbname_secret: None,
2309                    username: Some("user".into()),
2310                    username_secret: None,
2311                    password: Some("pass".into()),
2312                    password_secret: None,
2313                    auth: None,
2314                    ssl_mode: Some((*mode).into()),
2315                    ssl_mode_secret: None,
2316                }),
2317            });
2318            assert!(
2319                spec.validate_connection_spec().is_ok(),
2320                "sslMode '{mode}' should be accepted"
2321            );
2322        }
2323    }
2324
2325    #[test]
2326    fn validate_connection_rejects_empty_secret_name() {
2327        let spec = spec_with_connection(ConnectionSpec {
2328            secret_ref: None,
2329            secret_key: None,
2330            params: Some(ConnectionParams {
2331                host: Some("host".into()),
2332                host_secret: None,
2333                port: None,
2334                port_secret: None,
2335                dbname: Some("db".into()),
2336                dbname_secret: None,
2337                username: None,
2338                username_secret: Some(SecretKeySelector {
2339                    name: "".into(),
2340                    key: "username".into(),
2341                }),
2342                password: Some("pass".into()),
2343                password_secret: None,
2344                auth: None,
2345                ssl_mode: None,
2346                ssl_mode_secret: None,
2347            }),
2348        });
2349        assert!(spec.validate_connection_spec().is_err());
2350    }
2351
2352    #[test]
2353    fn validate_connection_rejects_both_literal_and_secret_for_same_field() {
2354        let spec = spec_with_connection(ConnectionSpec {
2355            secret_ref: None,
2356            secret_key: None,
2357            params: Some(ConnectionParams {
2358                host: Some("host".into()),
2359                host_secret: Some(SecretKeySelector {
2360                    name: "s".into(),
2361                    key: "k".into(),
2362                }),
2363                port: None,
2364                port_secret: None,
2365                dbname: Some("db".into()),
2366                dbname_secret: None,
2367                username: Some("user".into()),
2368                username_secret: None,
2369                password: Some("pass".into()),
2370                password_secret: None,
2371                auth: None,
2372                ssl_mode: None,
2373                ssl_mode_secret: None,
2374            }),
2375        });
2376        assert!(matches!(
2377            spec.validate_connection_spec(),
2378            Err(ConnectionValidationError::BothFieldsSet { ref field }) if field == "host"
2379        ));
2380    }
2381
2382    #[test]
2383    fn validate_connection_rejects_neither_literal_nor_secret_for_required_field() {
2384        let spec = spec_with_connection(ConnectionSpec {
2385            secret_ref: None,
2386            secret_key: None,
2387            params: Some(ConnectionParams {
2388                host: None,
2389                host_secret: None,
2390                port: None,
2391                port_secret: None,
2392                dbname: Some("db".into()),
2393                dbname_secret: None,
2394                username: Some("user".into()),
2395                username_secret: None,
2396                password: Some("pass".into()),
2397                password_secret: None,
2398                auth: None,
2399                ssl_mode: None,
2400                ssl_mode_secret: None,
2401            }),
2402        });
2403        assert!(matches!(
2404            spec.validate_connection_spec(),
2405            Err(ConnectionValidationError::NeitherFieldSet { ref field }) if field == "host"
2406        ));
2407    }
2408
2409    // -- ConnectionSpec backward compatibility --------------------------------
2410
2411    #[test]
2412    fn connection_spec_backward_compat_url_mode() {
2413        // The old format with required secretRef should still deserialize.
2414        let yaml = r#"
2415secretRef:
2416  name: pg-creds
2417secretKey: DATABASE_URL
2418"#;
2419        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2420        assert!(conn.secret_ref.is_some());
2421        assert_eq!(conn.effective_secret_key(), "DATABASE_URL");
2422        assert!(conn.params.is_none());
2423    }
2424
2425    #[test]
2426    fn connection_spec_backward_compat_default_secret_key() {
2427        let yaml = r#"
2428secretRef:
2429  name: pg-creds
2430"#;
2431        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2432        assert_eq!(conn.effective_secret_key(), "DATABASE_URL");
2433    }
2434
2435    #[test]
2436    fn connection_spec_params_mode_deserializes_keycloak_style() {
2437        let yaml = r#"
2438params:
2439  host: my-postgres
2440  port: 5432
2441  dbname: mydb
2442  usernameSecret:
2443    name: creds
2444    key: username
2445  passwordSecret:
2446    name: creds
2447    key: password
2448  sslMode: require
2449"#;
2450        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2451        assert!(conn.secret_ref.is_none());
2452        let params = conn.params.unwrap();
2453        assert_eq!(params.host.as_deref(), Some("my-postgres"));
2454        assert_eq!(params.port, Some(5432));
2455        assert!(params.username_secret.is_some());
2456        assert_eq!(params.username_secret.as_ref().unwrap().name, "creds");
2457        assert_eq!(params.ssl_mode.as_deref(), Some("require"));
2458    }
2459
2460    #[test]
2461    fn connection_spec_params_mode_deserializes_gcp_workload_identity_auth() {
2462        let yaml = r#"
2463params:
2464  host: 10.0.0.5
2465  port: 5432
2466  dbname: discovery
2467  username: pgroles-operator@my-project.iam
2468  auth:
2469    type: gcp_workload_identity
2470    impersonateServiceAccount: target@other-project.iam.gserviceaccount.com
2471    scope: https://example.com/custom-scope
2472"#;
2473        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2474        let params = conn.params.as_ref().unwrap();
2475        let auth = params.auth.as_ref().expect("auth should deserialize");
2476
2477        assert!(params.password.is_none());
2478        assert_eq!(auth.gcp_scope(), "https://example.com/custom-scope");
2479        assert_eq!(
2480            auth.gcp_impersonate_service_account(),
2481            Some("target@other-project.iam.gserviceaccount.com")
2482        );
2483        assert!(conn.cache_key("prod").contains("gcp_workload_identity"));
2484
2485        let spec = spec_with_connection(conn);
2486        assert!(spec.validate_connection_spec().is_ok());
2487    }
2488
2489    #[test]
2490    fn connection_spec_params_mode_all_secrets() {
2491        // CNPG/PGO pattern — everything from one secret.
2492        let yaml = r#"
2493params:
2494  hostSecret:
2495    name: cluster-app
2496    key: host
2497  portSecret:
2498    name: cluster-app
2499    key: port
2500  dbnameSecret:
2501    name: cluster-app
2502    key: dbname
2503  usernameSecret:
2504    name: cluster-app
2505    key: user
2506  passwordSecret:
2507    name: cluster-app
2508    key: password
2509"#;
2510        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2511        let params = conn.params.unwrap();
2512        assert!(params.host.is_none());
2513        assert!(params.host_secret.is_some());
2514        assert_eq!(params.host_secret.as_ref().unwrap().name, "cluster-app");
2515        assert!(params.port.is_none());
2516        assert!(params.port_secret.is_some());
2517    }
2518
2519    // -- referenced_secret_names with params mode ----------------------------
2520
2521    #[test]
2522    fn referenced_secret_names_includes_params_secrets() {
2523        let spec = spec_with_connection(params_mode_connection());
2524        let names = spec.referenced_secret_names("test-policy");
2525        assert!(
2526            names.contains("pg-creds"),
2527            "should include the credential secret from params"
2528        );
2529    }
2530
2531    #[test]
2532    fn referenced_secret_names_deduplicates_across_modes() {
2533        // Same secret name used in both connection and password secretRef.
2534        let mut spec = spec_with_connection(params_mode_connection());
2535        spec.roles = vec![RoleSpec {
2536            name: "app".into(),
2537            login: Some(true),
2538            password: Some(PasswordSpec {
2539                secret_ref: Some(SecretReference {
2540                    name: "pg-creds".into(),
2541                }),
2542                secret_key: Some("app-password".into()),
2543                generate: None,
2544            }),
2545            password_valid_until: None,
2546            superuser: None,
2547            createdb: None,
2548            createrole: None,
2549            inherit: None,
2550            replication: None,
2551            bypassrls: None,
2552            connection_limit: None,
2553            comment: None,
2554        }];
2555        let names = spec.referenced_secret_names("test-policy");
2556        // pg-creds appears in both connection params and password — should be deduped.
2557        assert_eq!(
2558            names.iter().filter(|n| *n == "pg-creds").count(),
2559            1,
2560            "BTreeSet should deduplicate"
2561        );
2562    }
2563
2564    // -- ConnectionParams port default ---------------------------------------
2565
2566    #[test]
2567    fn connection_params_port_defaults_to_none() {
2568        let yaml = r#"
2569params:
2570  host: my-host
2571  dbname: mydb
2572  username: user
2573  password: pass
2574"#;
2575        let conn: ConnectionSpec = serde_yaml::from_str(yaml).unwrap();
2576        let params = conn.params.unwrap();
2577        assert!(
2578            params.port.is_none(),
2579            "port should default to None (resolved as 5432 at runtime)"
2580        );
2581        assert!(
2582            params.port_secret.is_none(),
2583            "portSecret should also default to None"
2584        );
2585    }
2586
2587    #[test]
2588    fn now_rfc3339_produces_valid_format() {
2589        let ts = now_rfc3339();
2590        // Should match YYYY-MM-DDTHH:MM:SSZ
2591        assert!(ts.len() == 20, "expected 20 chars, got {}: {ts}", ts.len());
2592        assert!(ts.ends_with('Z'), "should end with Z: {ts}");
2593        assert_eq!(&ts[4..5], "-", "should have dash at pos 4: {ts}");
2594        assert_eq!(&ts[10..11], "T", "should have T at pos 10: {ts}");
2595    }
2596
2597    #[test]
2598    fn ready_condition_true_has_expected_shape() {
2599        let cond = ready_condition(true, "Reconciled", "All changes applied");
2600        assert_eq!(cond.condition_type, "Ready");
2601        assert_eq!(cond.status, "True");
2602        assert_eq!(cond.reason.as_deref(), Some("Reconciled"));
2603        assert_eq!(cond.message.as_deref(), Some("All changes applied"));
2604        assert!(cond.last_transition_time.is_some());
2605    }
2606
2607    #[test]
2608    fn ready_condition_false_has_expected_shape() {
2609        let cond = ready_condition(false, "InvalidSpec", "bad manifest");
2610        assert_eq!(cond.condition_type, "Ready");
2611        assert_eq!(cond.status, "False");
2612        assert_eq!(cond.reason.as_deref(), Some("InvalidSpec"));
2613        assert_eq!(cond.message.as_deref(), Some("bad manifest"));
2614    }
2615
2616    #[test]
2617    fn degraded_condition_has_expected_shape() {
2618        let cond = degraded_condition("InvalidSpec", "expansion failed");
2619        assert_eq!(cond.condition_type, "Degraded");
2620        assert_eq!(cond.status, "True");
2621        assert_eq!(cond.reason.as_deref(), Some("InvalidSpec"));
2622        assert_eq!(cond.message.as_deref(), Some("expansion failed"));
2623        assert!(cond.last_transition_time.is_some());
2624    }
2625
2626    #[test]
2627    fn reconciling_condition_has_expected_shape() {
2628        let cond = reconciling_condition("Reconciliation in progress");
2629        assert_eq!(cond.condition_type, "Reconciling");
2630        assert_eq!(cond.status, "True");
2631        assert_eq!(cond.reason.as_deref(), Some("Reconciling"));
2632        assert_eq!(cond.message.as_deref(), Some("Reconciliation in progress"));
2633        assert!(cond.last_transition_time.is_some());
2634    }
2635
2636    #[test]
2637    fn conflict_condition_has_expected_shape() {
2638        let cond = conflict_condition("ConflictingPolicy", "overlaps with ns/other");
2639        assert_eq!(cond.condition_type, "Conflict");
2640        assert_eq!(cond.status, "True");
2641        assert_eq!(cond.reason.as_deref(), Some("ConflictingPolicy"));
2642        assert_eq!(cond.message.as_deref(), Some("overlaps with ns/other"));
2643        assert!(cond.last_transition_time.is_some());
2644    }
2645
2646    #[test]
2647    fn ownership_claims_no_overlap() {
2648        let mut left = OwnershipClaims::default();
2649        left.roles.insert("analytics".to_string());
2650        left.schemas.insert("reporting".to_string());
2651
2652        let mut right = OwnershipClaims::default();
2653        right.roles.insert("billing".to_string());
2654        right.schemas.insert("payments".to_string());
2655
2656        assert!(!left.overlaps(&right));
2657        let summary = left.overlap_summary(&right);
2658        assert!(summary.is_empty());
2659    }
2660
2661    #[test]
2662    fn ownership_claims_partial_role_overlap() {
2663        let mut left = OwnershipClaims::default();
2664        left.roles.insert("analytics".to_string());
2665        left.roles.insert("reporting-viewer".to_string());
2666
2667        let mut right = OwnershipClaims::default();
2668        right.roles.insert("analytics".to_string());
2669        right.roles.insert("other-role".to_string());
2670
2671        assert!(left.overlaps(&right));
2672        let summary = left.overlap_summary(&right);
2673        assert!(summary.contains("roles: analytics"));
2674        assert!(!summary.contains("schemas"));
2675    }
2676
2677    #[test]
2678    fn ownership_claims_empty_is_disjoint() {
2679        let left = OwnershipClaims::default();
2680        let right = OwnershipClaims::default();
2681        assert!(!left.overlaps(&right));
2682    }
2683
2684    #[test]
2685    fn database_identity_equality() {
2686        let conn_a = ConnectionSpec {
2687            secret_ref: Some(SecretReference {
2688                name: "db-creds".to_string(),
2689            }),
2690            secret_key: Some("DATABASE_URL".to_string()),
2691            params: None,
2692        };
2693        let a = DatabaseIdentity::from_connection("prod", &conn_a);
2694        let b = DatabaseIdentity::from_connection("prod", &conn_a);
2695        let c = DatabaseIdentity::from_connection("staging", &conn_a);
2696        assert_eq!(a, b);
2697        assert_ne!(a, c);
2698    }
2699
2700    #[test]
2701    fn database_identity_different_key() {
2702        let conn_a = ConnectionSpec {
2703            secret_ref: Some(SecretReference {
2704                name: "db-creds".to_string(),
2705            }),
2706            secret_key: Some("DATABASE_URL".to_string()),
2707            params: None,
2708        };
2709        let conn_b = ConnectionSpec {
2710            secret_ref: Some(SecretReference {
2711                name: "db-creds".to_string(),
2712            }),
2713            secret_key: Some("CUSTOM_URL".to_string()),
2714            params: None,
2715        };
2716        let a = DatabaseIdentity::from_connection("prod", &conn_a);
2717        let b = DatabaseIdentity::from_connection("prod", &conn_b);
2718        assert_ne!(a, b);
2719    }
2720
2721    #[test]
2722    fn status_default_has_empty_conditions() {
2723        let status = PostgresPolicyStatus::default();
2724        assert!(status.conditions.is_empty());
2725        assert!(status.observed_generation.is_none());
2726        assert!(status.last_attempted_generation.is_none());
2727        assert!(status.last_successful_reconcile_time.is_none());
2728        assert!(status.change_summary.is_none());
2729        assert!(status.managed_database_identity.is_none());
2730        assert!(status.owned_roles.is_empty());
2731        assert!(status.owned_schemas.is_empty());
2732        assert!(status.last_error.is_none());
2733        assert!(status.applied_password_source_versions.is_empty());
2734    }
2735
2736    #[test]
2737    fn status_degraded_workflow_sets_ready_false_and_degraded_true() {
2738        let mut status = PostgresPolicyStatus::default();
2739
2740        // Simulate a failed reconciliation: Ready=False + Degraded=True
2741        status.set_condition(ready_condition(false, "InvalidSpec", "bad manifest"));
2742        status.set_condition(degraded_condition("InvalidSpec", "bad manifest"));
2743        status
2744            .conditions
2745            .retain(|c| c.condition_type != "Reconciling" && c.condition_type != "Paused");
2746        status.change_summary = None;
2747        status.last_error = Some("bad manifest".to_string());
2748
2749        // Verify Ready=False
2750        let ready = status
2751            .conditions
2752            .iter()
2753            .find(|c| c.condition_type == "Ready")
2754            .expect("should have Ready condition");
2755        assert_eq!(ready.status, "False");
2756        assert_eq!(ready.reason.as_deref(), Some("InvalidSpec"));
2757
2758        // Verify Degraded=True
2759        let degraded = status
2760            .conditions
2761            .iter()
2762            .find(|c| c.condition_type == "Degraded")
2763            .expect("should have Degraded condition");
2764        assert_eq!(degraded.status, "True");
2765        assert_eq!(degraded.reason.as_deref(), Some("InvalidSpec"));
2766
2767        // Verify last_error is set
2768        assert_eq!(status.last_error.as_deref(), Some("bad manifest"));
2769    }
2770
2771    #[test]
2772    fn status_conflict_workflow() {
2773        let mut status = PostgresPolicyStatus::default();
2774
2775        // Simulate a conflict
2776        let msg = "policy ownership overlaps with staging/other on database target prod/db/URL";
2777        status.set_condition(ready_condition(false, "ConflictingPolicy", msg));
2778        status.set_condition(conflict_condition("ConflictingPolicy", msg));
2779        status.set_condition(degraded_condition("ConflictingPolicy", msg));
2780        status
2781            .conditions
2782            .retain(|c| c.condition_type != "Reconciling");
2783        status.last_error = Some(msg.to_string());
2784
2785        // Verify Conflict=True
2786        let conflict = status
2787            .conditions
2788            .iter()
2789            .find(|c| c.condition_type == "Conflict")
2790            .expect("should have Conflict condition");
2791        assert_eq!(conflict.status, "True");
2792        assert_eq!(conflict.reason.as_deref(), Some("ConflictingPolicy"));
2793
2794        // Verify Ready=False
2795        let ready = status
2796            .conditions
2797            .iter()
2798            .find(|c| c.condition_type == "Ready")
2799            .expect("should have Ready condition");
2800        assert_eq!(ready.status, "False");
2801
2802        // Verify Degraded=True
2803        let degraded = status
2804            .conditions
2805            .iter()
2806            .find(|c| c.condition_type == "Degraded")
2807            .expect("should have Degraded condition");
2808        assert_eq!(degraded.status, "True");
2809    }
2810
2811    #[test]
2812    fn status_successful_reconcile_records_generation_and_time() {
2813        let mut status = PostgresPolicyStatus::default();
2814        let generation = Some(3_i64);
2815        let summary = ChangeSummary {
2816            roles_created: 2,
2817            total: 2,
2818            ..Default::default()
2819        };
2820
2821        // Simulate a successful reconciliation
2822        status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
2823        status.conditions.retain(|c| {
2824            c.condition_type != "Reconciling"
2825                && c.condition_type != "Degraded"
2826                && c.condition_type != "Conflict"
2827                && c.condition_type != "Paused"
2828        });
2829        status.observed_generation = generation;
2830        status.last_attempted_generation = generation;
2831        status.last_successful_reconcile_time = Some(now_rfc3339());
2832        status.last_reconcile_time = Some(now_rfc3339());
2833        status.change_summary = Some(summary);
2834        status.last_error = None;
2835
2836        // Verify Ready=True
2837        let ready = status
2838            .conditions
2839            .iter()
2840            .find(|c| c.condition_type == "Ready")
2841            .expect("should have Ready condition");
2842        assert_eq!(ready.status, "True");
2843        assert_eq!(ready.reason.as_deref(), Some("Reconciled"));
2844
2845        // Verify generation recorded
2846        assert_eq!(status.observed_generation, Some(3));
2847        assert_eq!(status.last_attempted_generation, Some(3));
2848
2849        // Verify timestamps set
2850        assert!(status.last_successful_reconcile_time.is_some());
2851        assert!(status.last_reconcile_time.is_some());
2852
2853        // Verify summary
2854        let summary = status.change_summary.as_ref().unwrap();
2855        assert_eq!(summary.roles_created, 2);
2856        assert_eq!(summary.total, 2);
2857
2858        // Verify no error
2859        assert!(status.last_error.is_none());
2860
2861        // Verify no Degraded/Conflict/Paused/Reconciling conditions
2862        assert!(
2863            status
2864                .conditions
2865                .iter()
2866                .all(|c| c.condition_type != "Degraded"
2867                    && c.condition_type != "Conflict"
2868                    && c.condition_type != "Paused"
2869                    && c.condition_type != "Reconciling")
2870        );
2871    }
2872
2873    #[test]
2874    fn status_suspended_workflow() {
2875        let mut status = PostgresPolicyStatus::default();
2876        let generation = Some(2_i64);
2877
2878        // Simulate a suspended reconciliation
2879        status.set_condition(paused_condition("Reconciliation suspended by spec"));
2880        status.set_condition(ready_condition(
2881            false,
2882            "Suspended",
2883            "Reconciliation suspended by spec",
2884        ));
2885        status
2886            .conditions
2887            .retain(|c| c.condition_type != "Reconciling");
2888        status.last_attempted_generation = generation;
2889        status.last_error = None;
2890
2891        // Verify Paused=True
2892        let paused = status
2893            .conditions
2894            .iter()
2895            .find(|c| c.condition_type == "Paused")
2896            .expect("should have Paused condition");
2897        assert_eq!(paused.status, "True");
2898
2899        // Verify Ready=False with Suspended reason
2900        let ready = status
2901            .conditions
2902            .iter()
2903            .find(|c| c.condition_type == "Ready")
2904            .expect("should have Ready condition");
2905        assert_eq!(ready.status, "False");
2906        assert_eq!(ready.reason.as_deref(), Some("Suspended"));
2907
2908        // Verify no Reconciling condition
2909        assert!(
2910            !status
2911                .conditions
2912                .iter()
2913                .any(|c| c.condition_type == "Reconciling")
2914        );
2915    }
2916
2917    #[test]
2918    fn status_transitions_from_degraded_to_ready() {
2919        let mut status = PostgresPolicyStatus::default();
2920
2921        // First, set degraded state
2922        status.set_condition(ready_condition(false, "InvalidSpec", "error"));
2923        status.set_condition(degraded_condition("InvalidSpec", "error"));
2924        status.last_error = Some("error".to_string());
2925
2926        assert_eq!(status.conditions.len(), 2);
2927
2928        // Then, resolve to ready
2929        status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
2930        status.conditions.retain(|c| {
2931            c.condition_type != "Reconciling"
2932                && c.condition_type != "Degraded"
2933                && c.condition_type != "Conflict"
2934                && c.condition_type != "Paused"
2935        });
2936        status.last_error = None;
2937
2938        // Verify Ready=True
2939        let ready = status
2940            .conditions
2941            .iter()
2942            .find(|c| c.condition_type == "Ready")
2943            .expect("should have Ready condition");
2944        assert_eq!(ready.status, "True");
2945
2946        // Verify Degraded removed
2947        assert!(
2948            !status
2949                .conditions
2950                .iter()
2951                .any(|c| c.condition_type == "Degraded")
2952        );
2953
2954        // Verify only Ready condition remains
2955        assert_eq!(status.conditions.len(), 1);
2956
2957        // Verify error cleared
2958        assert!(status.last_error.is_none());
2959    }
2960
2961    #[test]
2962    fn change_summary_default_is_all_zero() {
2963        let summary = ChangeSummary::default();
2964        assert_eq!(summary.roles_created, 0);
2965        assert_eq!(summary.roles_altered, 0);
2966        assert_eq!(summary.roles_dropped, 0);
2967        assert_eq!(summary.sessions_terminated, 0);
2968        assert_eq!(summary.grants_added, 0);
2969        assert_eq!(summary.grants_revoked, 0);
2970        assert_eq!(summary.default_privileges_set, 0);
2971        assert_eq!(summary.default_privileges_revoked, 0);
2972        assert_eq!(summary.members_added, 0);
2973        assert_eq!(summary.members_removed, 0);
2974        assert_eq!(summary.total, 0);
2975    }
2976
2977    #[test]
2978    fn status_serializes_to_json() {
2979        let mut status = PostgresPolicyStatus::default();
2980        status.set_condition(ready_condition(true, "Reconciled", "done"));
2981        status.observed_generation = Some(5);
2982        status.managed_database_identity = Some("ns/secret/key".to_string());
2983        status.owned_roles = vec!["role-a".to_string(), "role-b".to_string()];
2984        status.owned_schemas = vec!["public".to_string()];
2985        status.change_summary = Some(ChangeSummary {
2986            roles_created: 1,
2987            total: 1,
2988            ..Default::default()
2989        });
2990
2991        let json = serde_json::to_string(&status).expect("should serialize");
2992        assert!(json.contains("\"Reconciled\""));
2993        assert!(json.contains("\"observed_generation\":5"));
2994        assert!(json.contains("\"role-a\""));
2995        assert!(json.contains("\"ns/secret/key\""));
2996    }
2997
2998    #[test]
2999    fn crd_spec_deserializes_from_yaml() {
3000        let yaml = r#"
3001connection:
3002  secretRef:
3003    name: pg-credentials
3004interval: "10m"
3005default_owner: app_owner
3006profiles:
3007  editor:
3008    grants:
3009      - privileges: [USAGE]
3010        object: { type: schema }
3011      - privileges: [SELECT, INSERT, UPDATE, DELETE]
3012        object: { type: table, name: "*" }
3013    default_privileges:
3014      - privileges: [SELECT, INSERT, UPDATE, DELETE]
3015        on_type: table
3016schemas:
3017  - name: inventory
3018    profiles: [editor]
3019roles:
3020  - name: analytics
3021    login: true
3022grants:
3023  - role: analytics
3024    privileges: [CONNECT]
3025    object: { type: database, name: mydb }
3026memberships:
3027  - role: inventory-editor
3028    members:
3029      - name: analytics
3030retirements:
3031  - role: legacy-app
3032    reassign_owned_to: app_owner
3033    drop_owned: true
3034    terminate_sessions: true
3035"#;
3036        let spec: PostgresPolicySpec = serde_yaml::from_str(yaml).expect("should deserialize");
3037        assert_eq!(spec.interval, "10m");
3038        assert_eq!(spec.default_owner, Some("app_owner".to_string()));
3039        assert_eq!(spec.profiles.len(), 1);
3040        assert!(spec.profiles.contains_key("editor"));
3041        assert_eq!(spec.schemas.len(), 1);
3042        assert_eq!(spec.roles.len(), 1);
3043        assert_eq!(spec.grants.len(), 1);
3044        assert_eq!(spec.memberships.len(), 1);
3045        assert_eq!(spec.retirements.len(), 1);
3046        assert_eq!(spec.retirements[0].role, "legacy-app");
3047        assert!(spec.retirements[0].terminate_sessions);
3048    }
3049
3050    #[test]
3051    fn referenced_secret_names_includes_connection_secret() {
3052        let spec = PostgresPolicySpec {
3053            connection: ConnectionSpec {
3054                secret_ref: Some(SecretReference {
3055                    name: "pg-conn".to_string(),
3056                }),
3057                secret_key: Some("DATABASE_URL".to_string()),
3058                params: None,
3059            },
3060            interval: "5m".to_string(),
3061            suspend: false,
3062            mode: PolicyMode::Apply,
3063            reconciliation_mode: CrdReconciliationMode::default(),
3064            default_owner: None,
3065            profiles: std::collections::HashMap::new(),
3066            schemas: vec![],
3067            roles: vec![],
3068            grants: vec![],
3069            default_privileges: vec![],
3070            memberships: vec![],
3071            retirements: vec![],
3072            approval: None,
3073        };
3074
3075        let names = spec.referenced_secret_names("test-policy");
3076        assert!(names.contains("pg-conn"));
3077        assert_eq!(names.len(), 1);
3078    }
3079
3080    #[test]
3081    fn referenced_secret_names_includes_password_secrets() {
3082        let spec = PostgresPolicySpec {
3083            connection: ConnectionSpec {
3084                secret_ref: Some(SecretReference {
3085                    name: "pg-conn".to_string(),
3086                }),
3087                secret_key: Some("DATABASE_URL".to_string()),
3088                params: None,
3089            },
3090            interval: "5m".to_string(),
3091            suspend: false,
3092            mode: PolicyMode::Apply,
3093            reconciliation_mode: CrdReconciliationMode::default(),
3094            default_owner: None,
3095            profiles: std::collections::HashMap::new(),
3096            schemas: vec![],
3097            roles: vec![
3098                RoleSpec {
3099                    name: "role-a".to_string(),
3100                    login: Some(true),
3101                    password: Some(PasswordSpec {
3102                        secret_ref: Some(SecretReference {
3103                            name: "role-passwords".to_string(),
3104                        }),
3105                        secret_key: Some("role-a".to_string()),
3106                        generate: None,
3107                    }),
3108                    password_valid_until: None,
3109                    superuser: None,
3110                    createdb: None,
3111                    createrole: None,
3112                    inherit: None,
3113                    replication: None,
3114                    bypassrls: None,
3115                    connection_limit: None,
3116                    comment: None,
3117                },
3118                RoleSpec {
3119                    name: "role-b".to_string(),
3120                    login: Some(true),
3121                    password: Some(PasswordSpec {
3122                        secret_ref: Some(SecretReference {
3123                            name: "other-secret".to_string(),
3124                        }),
3125                        secret_key: None,
3126                        generate: None,
3127                    }),
3128                    password_valid_until: None,
3129                    superuser: None,
3130                    createdb: None,
3131                    createrole: None,
3132                    inherit: None,
3133                    replication: None,
3134                    bypassrls: None,
3135                    connection_limit: None,
3136                    comment: None,
3137                },
3138                RoleSpec {
3139                    name: "role-c".to_string(),
3140                    login: None,
3141                    password: None,
3142                    password_valid_until: None,
3143                    superuser: None,
3144                    createdb: None,
3145                    createrole: None,
3146                    inherit: None,
3147                    replication: None,
3148                    bypassrls: None,
3149                    connection_limit: None,
3150                    comment: None,
3151                },
3152            ],
3153            grants: vec![],
3154            default_privileges: vec![],
3155            memberships: vec![],
3156            retirements: vec![],
3157            approval: None,
3158        };
3159
3160        let names = spec.referenced_secret_names("test-policy");
3161        assert!(
3162            names.contains("pg-conn"),
3163            "should include connection secret"
3164        );
3165        assert!(
3166            names.contains("role-passwords"),
3167            "should include role-a password secret"
3168        );
3169        assert!(
3170            names.contains("other-secret"),
3171            "should include role-b password secret"
3172        );
3173        assert_eq!(names.len(), 3);
3174    }
3175
3176    #[test]
3177    fn validate_password_specs_rejects_password_without_login() {
3178        let spec = PostgresPolicySpec {
3179            connection: ConnectionSpec {
3180                secret_ref: Some(SecretReference {
3181                    name: "pg-conn".to_string(),
3182                }),
3183                secret_key: Some("DATABASE_URL".to_string()),
3184                params: None,
3185            },
3186            interval: "5m".to_string(),
3187            suspend: false,
3188            mode: PolicyMode::Apply,
3189            reconciliation_mode: CrdReconciliationMode::default(),
3190            default_owner: None,
3191            profiles: std::collections::HashMap::new(),
3192            schemas: vec![],
3193            roles: vec![RoleSpec {
3194                name: "app-user".to_string(),
3195                login: Some(false),
3196                superuser: None,
3197                createdb: None,
3198                createrole: None,
3199                inherit: None,
3200                replication: None,
3201                bypassrls: None,
3202                connection_limit: None,
3203                comment: None,
3204                password: Some(PasswordSpec {
3205                    secret_ref: Some(SecretReference {
3206                        name: "role-passwords".to_string(),
3207                    }),
3208                    secret_key: None,
3209                    generate: None,
3210                }),
3211                password_valid_until: None,
3212            }],
3213            grants: vec![],
3214            default_privileges: vec![],
3215            memberships: vec![],
3216            retirements: vec![],
3217            approval: None,
3218        };
3219
3220        assert!(matches!(
3221            spec.validate_password_specs("test-policy"),
3222            Err(PasswordValidationError::PasswordWithoutLogin { ref role }) if role == "app-user"
3223        ));
3224    }
3225
3226    #[test]
3227    fn validate_password_specs_rejects_password_with_login_omitted() {
3228        let spec = PostgresPolicySpec {
3229            connection: ConnectionSpec {
3230                secret_ref: Some(SecretReference {
3231                    name: "pg-conn".to_string(),
3232                }),
3233                secret_key: Some("DATABASE_URL".to_string()),
3234                params: None,
3235            },
3236            interval: "5m".to_string(),
3237            suspend: false,
3238            mode: PolicyMode::Apply,
3239            reconciliation_mode: CrdReconciliationMode::default(),
3240            default_owner: None,
3241            profiles: std::collections::HashMap::new(),
3242            schemas: vec![],
3243            roles: vec![RoleSpec {
3244                name: "app-user".to_string(),
3245                login: None, // omitted, not explicitly false
3246                superuser: None,
3247                createdb: None,
3248                createrole: None,
3249                inherit: None,
3250                replication: None,
3251                bypassrls: None,
3252                connection_limit: None,
3253                comment: None,
3254                password: Some(PasswordSpec {
3255                    secret_ref: Some(SecretReference {
3256                        name: "role-passwords".to_string(),
3257                    }),
3258                    secret_key: None,
3259                    generate: None,
3260                }),
3261                password_valid_until: None,
3262            }],
3263            grants: vec![],
3264            default_privileges: vec![],
3265            memberships: vec![],
3266            retirements: vec![],
3267            approval: None,
3268        };
3269
3270        assert!(matches!(
3271            spec.validate_password_specs("test-policy"),
3272            Err(PasswordValidationError::PasswordWithoutLogin { ref role }) if role == "app-user"
3273        ));
3274    }
3275
3276    #[test]
3277    fn validate_password_specs_rejects_invalid_password_mode() {
3278        let spec = PostgresPolicySpec {
3279            connection: ConnectionSpec {
3280                secret_ref: Some(SecretReference {
3281                    name: "pg-conn".to_string(),
3282                }),
3283                secret_key: Some("DATABASE_URL".to_string()),
3284                params: None,
3285            },
3286            interval: "5m".to_string(),
3287            suspend: false,
3288            mode: PolicyMode::Apply,
3289            reconciliation_mode: CrdReconciliationMode::default(),
3290            default_owner: None,
3291            profiles: std::collections::HashMap::new(),
3292            schemas: vec![],
3293            roles: vec![RoleSpec {
3294                name: "app-user".to_string(),
3295                login: Some(true),
3296                superuser: None,
3297                createdb: None,
3298                createrole: None,
3299                inherit: None,
3300                replication: None,
3301                bypassrls: None,
3302                connection_limit: None,
3303                comment: None,
3304                password: Some(PasswordSpec {
3305                    secret_ref: Some(SecretReference {
3306                        name: "role-passwords".to_string(),
3307                    }),
3308                    secret_key: None,
3309                    generate: Some(GeneratePasswordSpec {
3310                        length: Some(32),
3311                        secret_name: None,
3312                        secret_key: None,
3313                    }),
3314                }),
3315                password_valid_until: None,
3316            }],
3317            grants: vec![],
3318            default_privileges: vec![],
3319            memberships: vec![],
3320            retirements: vec![],
3321            approval: None,
3322        };
3323
3324        assert!(matches!(
3325            spec.validate_password_specs("test-policy"),
3326            Err(PasswordValidationError::InvalidPasswordMode { ref role }) if role == "app-user"
3327        ));
3328    }
3329
3330    #[test]
3331    fn validate_password_specs_rejects_invalid_generated_length() {
3332        let spec = PostgresPolicySpec {
3333            connection: ConnectionSpec {
3334                secret_ref: Some(SecretReference {
3335                    name: "pg-conn".to_string(),
3336                }),
3337                secret_key: Some("DATABASE_URL".to_string()),
3338                params: None,
3339            },
3340            interval: "5m".to_string(),
3341            suspend: false,
3342            mode: PolicyMode::Apply,
3343            reconciliation_mode: CrdReconciliationMode::default(),
3344            default_owner: None,
3345            profiles: std::collections::HashMap::new(),
3346            schemas: vec![],
3347            roles: vec![RoleSpec {
3348                name: "app-user".to_string(),
3349                login: Some(true),
3350                superuser: None,
3351                createdb: None,
3352                createrole: None,
3353                inherit: None,
3354                replication: None,
3355                bypassrls: None,
3356                connection_limit: None,
3357                comment: None,
3358                password: Some(PasswordSpec {
3359                    secret_ref: None,
3360                    secret_key: None,
3361                    generate: Some(GeneratePasswordSpec {
3362                        length: Some(8),
3363                        secret_name: None,
3364                        secret_key: None,
3365                    }),
3366                }),
3367                password_valid_until: None,
3368            }],
3369            grants: vec![],
3370            default_privileges: vec![],
3371            memberships: vec![],
3372            retirements: vec![],
3373            approval: None,
3374        };
3375
3376        assert!(matches!(
3377            spec.validate_password_specs("test-policy"),
3378            Err(PasswordValidationError::InvalidGeneratedLength { ref role, .. }) if role == "app-user"
3379        ));
3380    }
3381
3382    #[test]
3383    fn validate_password_specs_rejects_invalid_generated_secret_key() {
3384        let spec = PostgresPolicySpec {
3385            connection: ConnectionSpec {
3386                secret_ref: Some(SecretReference {
3387                    name: "pg-conn".to_string(),
3388                }),
3389                secret_key: Some("DATABASE_URL".to_string()),
3390                params: None,
3391            },
3392            interval: "5m".to_string(),
3393            suspend: false,
3394            mode: PolicyMode::Apply,
3395            reconciliation_mode: CrdReconciliationMode::default(),
3396            default_owner: None,
3397            profiles: std::collections::HashMap::new(),
3398            schemas: vec![],
3399            roles: vec![RoleSpec {
3400                name: "app-user".to_string(),
3401                login: Some(true),
3402                superuser: None,
3403                createdb: None,
3404                createrole: None,
3405                inherit: None,
3406                replication: None,
3407                bypassrls: None,
3408                connection_limit: None,
3409                comment: None,
3410                password: Some(PasswordSpec {
3411                    secret_ref: None,
3412                    secret_key: None,
3413                    generate: Some(GeneratePasswordSpec {
3414                        length: Some(32),
3415                        secret_name: None,
3416                        secret_key: Some("bad/key".to_string()),
3417                    }),
3418                }),
3419                password_valid_until: None,
3420            }],
3421            grants: vec![],
3422            default_privileges: vec![],
3423            memberships: vec![],
3424            retirements: vec![],
3425            approval: None,
3426        };
3427
3428        assert!(matches!(
3429            spec.validate_password_specs("test-policy"),
3430            Err(PasswordValidationError::InvalidSecretKey { ref role, field, .. })
3431                if role == "app-user" && field == "generate.secretKey"
3432        ));
3433    }
3434
3435    #[test]
3436    fn validate_password_specs_rejects_invalid_generated_secret_name() {
3437        let spec = PostgresPolicySpec {
3438            connection: ConnectionSpec {
3439                secret_ref: Some(SecretReference {
3440                    name: "pg-conn".to_string(),
3441                }),
3442                secret_key: Some("DATABASE_URL".to_string()),
3443                params: None,
3444            },
3445            interval: "5m".to_string(),
3446            suspend: false,
3447            mode: PolicyMode::Apply,
3448            reconciliation_mode: CrdReconciliationMode::default(),
3449            default_owner: None,
3450            profiles: std::collections::HashMap::new(),
3451            schemas: vec![],
3452            roles: vec![RoleSpec {
3453                name: "app-user".to_string(),
3454                login: Some(true),
3455                superuser: None,
3456                createdb: None,
3457                createrole: None,
3458                inherit: None,
3459                replication: None,
3460                bypassrls: None,
3461                connection_limit: None,
3462                comment: None,
3463                password: Some(PasswordSpec {
3464                    secret_ref: None,
3465                    secret_key: None,
3466                    generate: Some(GeneratePasswordSpec {
3467                        length: Some(32),
3468                        secret_name: Some("Bad_Name".to_string()),
3469                        secret_key: None,
3470                    }),
3471                }),
3472                password_valid_until: None,
3473            }],
3474            grants: vec![],
3475            default_privileges: vec![],
3476            memberships: vec![],
3477            retirements: vec![],
3478            approval: None,
3479        };
3480
3481        assert!(matches!(
3482            spec.validate_password_specs("test-policy"),
3483            Err(PasswordValidationError::InvalidGeneratedSecretName { ref role, .. }) if role == "app-user"
3484        ));
3485    }
3486
3487    #[test]
3488    fn validate_password_specs_rejects_reserved_generated_secret_key() {
3489        let spec = PostgresPolicySpec {
3490            connection: ConnectionSpec {
3491                secret_ref: Some(SecretReference {
3492                    name: "pg-conn".to_string(),
3493                }),
3494                secret_key: Some("DATABASE_URL".to_string()),
3495                params: None,
3496            },
3497            interval: "5m".to_string(),
3498            suspend: false,
3499            mode: PolicyMode::Apply,
3500            reconciliation_mode: CrdReconciliationMode::default(),
3501            default_owner: None,
3502            profiles: std::collections::HashMap::new(),
3503            schemas: vec![],
3504            roles: vec![RoleSpec {
3505                name: "app-user".to_string(),
3506                login: Some(true),
3507                superuser: None,
3508                createdb: None,
3509                createrole: None,
3510                inherit: None,
3511                replication: None,
3512                bypassrls: None,
3513                connection_limit: None,
3514                comment: None,
3515                password: Some(PasswordSpec {
3516                    secret_ref: None,
3517                    secret_key: None,
3518                    generate: Some(GeneratePasswordSpec {
3519                        length: Some(32),
3520                        secret_name: None,
3521                        secret_key: Some("verifier".to_string()),
3522                    }),
3523                }),
3524                password_valid_until: None,
3525            }],
3526            grants: vec![],
3527            default_privileges: vec![],
3528            memberships: vec![],
3529            retirements: vec![],
3530            approval: None,
3531        };
3532
3533        assert!(matches!(
3534            spec.validate_password_specs("test-policy"),
3535            Err(PasswordValidationError::ReservedGeneratedSecretKey { ref role, ref key })
3536                if role == "app-user" && key == "verifier"
3537        ));
3538    }
3539
3540    #[test]
3541    fn plan_crd_generates_valid_schema() {
3542        let crd = PostgresPolicyPlan::crd();
3543        let yaml = serde_yaml::to_string(&crd).expect("CRD should serialize to YAML");
3544        assert!(yaml.contains("pgroles.io"), "group should be pgroles.io");
3545        assert!(yaml.contains("v1alpha1"), "version should be v1alpha1");
3546        assert!(
3547            yaml.contains("PostgresPolicyPlan"),
3548            "kind should be PostgresPolicyPlan"
3549        );
3550        assert!(yaml.contains("pgplan"), "should have shortname pgplan");
3551    }
3552
3553    #[test]
3554    fn plan_phase_display() {
3555        assert_eq!(PlanPhase::Pending.to_string(), "Pending");
3556        assert_eq!(PlanPhase::Approved.to_string(), "Approved");
3557        assert_eq!(PlanPhase::Applying.to_string(), "Applying");
3558        assert_eq!(PlanPhase::Applied.to_string(), "Applied");
3559        assert_eq!(PlanPhase::Failed.to_string(), "Failed");
3560        assert_eq!(PlanPhase::Superseded.to_string(), "Superseded");
3561    }
3562
3563    #[test]
3564    fn plan_phase_default_is_pending() {
3565        assert_eq!(PlanPhase::default(), PlanPhase::Pending);
3566    }
3567
3568    #[test]
3569    fn effective_approval_infers_from_mode() {
3570        let base = PostgresPolicySpec {
3571            connection: ConnectionSpec {
3572                secret_ref: Some(SecretReference {
3573                    name: "test".into(),
3574                }),
3575                secret_key: Some("DATABASE_URL".into()),
3576                params: None,
3577            },
3578            interval: "5m".into(),
3579            suspend: false,
3580            mode: PolicyMode::Apply,
3581            reconciliation_mode: CrdReconciliationMode::Authoritative,
3582            default_owner: None,
3583            profiles: Default::default(),
3584            schemas: vec![],
3585            roles: vec![],
3586            grants: vec![],
3587            default_privileges: vec![],
3588            memberships: vec![],
3589            retirements: vec![],
3590            approval: None,
3591        };
3592
3593        // apply mode with no explicit approval → Auto
3594        assert_eq!(base.effective_approval(), ApprovalMode::Auto);
3595
3596        // plan mode with no explicit approval → Manual
3597        let plan = PostgresPolicySpec {
3598            mode: PolicyMode::Plan,
3599            ..base.clone()
3600        };
3601        assert_eq!(plan.effective_approval(), ApprovalMode::Manual);
3602
3603        // explicit Manual overrides apply mode
3604        let explicit = PostgresPolicySpec {
3605            approval: Some(ApprovalMode::Manual),
3606            ..base.clone()
3607        };
3608        assert_eq!(explicit.effective_approval(), ApprovalMode::Manual);
3609    }
3610
3611    #[test]
3612    fn approval_mode_serde_roundtrip() {
3613        // Deserialize
3614        let manual: ApprovalMode = serde_json::from_str("\"manual\"").unwrap();
3615        assert_eq!(manual, ApprovalMode::Manual);
3616        let auto: ApprovalMode = serde_json::from_str("\"auto\"").unwrap();
3617        assert_eq!(auto, ApprovalMode::Auto);
3618
3619        // Serialize back
3620        let manual_json = serde_json::to_value(&ApprovalMode::Manual).unwrap();
3621        assert_eq!(manual_json, serde_json::Value::String("manual".to_string()));
3622        let auto_json = serde_json::to_value(&ApprovalMode::Auto).unwrap();
3623        assert_eq!(auto_json, serde_json::Value::String("auto".to_string()));
3624    }
3625
3626    #[test]
3627    fn plan_status_default_is_empty() {
3628        let status = PostgresPolicyPlanStatus::default();
3629        assert_eq!(status.phase, PlanPhase::Pending);
3630        assert!(status.conditions.is_empty());
3631        assert!(status.change_summary.is_none());
3632        assert!(status.sql_ref.is_none());
3633        assert!(status.sql_inline.is_none());
3634        assert!(status.computed_at.is_none());
3635        assert!(status.applied_at.is_none());
3636        assert!(status.last_error.is_none());
3637    }
3638
3639    #[test]
3640    fn spec_without_approval_field_deserializes_as_none() {
3641        let json = serde_json::json!({
3642            "connection": {
3643                "secretRef": { "name": "pg-secret" },
3644                "secretKey": "DATABASE_URL"
3645            },
3646            "interval": "5m",
3647            "suspend": false,
3648            "mode": "apply",
3649            "reconciliation_mode": "authoritative"
3650        });
3651
3652        let spec: PostgresPolicySpec =
3653            serde_json::from_value(json).expect("should deserialize without approval field");
3654        assert!(
3655            spec.approval.is_none(),
3656            "approval should be None when omitted"
3657        );
3658        assert_eq!(
3659            spec.effective_approval(),
3660            ApprovalMode::Auto,
3661            "effective_approval should infer Auto from apply mode"
3662        );
3663    }
3664
3665    #[test]
3666    fn status_without_current_plan_ref_deserializes_as_none() {
3667        let json = serde_json::json!({
3668            "conditions": [],
3669            "owned_roles": [],
3670            "owned_schemas": []
3671        });
3672
3673        let status: PostgresPolicyStatus =
3674            serde_json::from_value(json).expect("should deserialize without current_plan_ref");
3675        assert!(
3676            status.current_plan_ref.is_none(),
3677            "current_plan_ref should be None when omitted"
3678        );
3679    }
3680
3681    #[test]
3682    fn effective_approval_explicit_auto_overrides_plan_mode() {
3683        let spec = PostgresPolicySpec {
3684            connection: ConnectionSpec {
3685                secret_ref: Some(SecretReference {
3686                    name: "test".into(),
3687                }),
3688                secret_key: Some("DATABASE_URL".into()),
3689                params: None,
3690            },
3691            interval: "5m".into(),
3692            suspend: false,
3693            mode: PolicyMode::Plan,
3694            reconciliation_mode: CrdReconciliationMode::Authoritative,
3695            default_owner: None,
3696            profiles: Default::default(),
3697            schemas: vec![],
3698            roles: vec![],
3699            grants: vec![],
3700            default_privileges: vec![],
3701            memberships: vec![],
3702            retirements: vec![],
3703            approval: Some(ApprovalMode::Auto),
3704        };
3705
3706        assert_eq!(
3707            spec.effective_approval(),
3708            ApprovalMode::Auto,
3709            "explicit Auto should override Plan mode's default of Manual"
3710        );
3711    }
3712
3713    #[test]
3714    fn plan_phase_rejected_display() {
3715        assert_eq!(PlanPhase::Rejected.to_string(), "Rejected");
3716    }
3717
3718    #[test]
3719    fn plan_phase_all_variants_display() {
3720        let variants = [
3721            PlanPhase::Pending,
3722            PlanPhase::Approved,
3723            PlanPhase::Applying,
3724            PlanPhase::Applied,
3725            PlanPhase::Failed,
3726            PlanPhase::Superseded,
3727            PlanPhase::Rejected,
3728        ];
3729        for variant in &variants {
3730            let display = variant.to_string();
3731            assert!(
3732                !display.is_empty(),
3733                "PlanPhase::{variant:?} should have non-empty Display output"
3734            );
3735        }
3736    }
3737
3738    #[test]
3739    fn plan_status_defaults() {
3740        let status = PostgresPolicyPlanStatus::default();
3741        assert_eq!(status.phase, PlanPhase::Pending);
3742        assert!(status.conditions.is_empty());
3743        assert!(status.sql_ref.is_none());
3744        assert!(status.sql_hash.is_none());
3745        assert!(status.sql_inline.is_none());
3746        assert!(!status.sql_truncated);
3747        assert!(status.redacted_sql_hash.is_none());
3748        assert!(status.sql_original_bytes.is_none());
3749        assert!(status.sql_stored_bytes.is_none());
3750        assert!(status.change_summary.is_none());
3751        assert!(status.computed_at.is_none());
3752        assert!(status.applied_at.is_none());
3753        assert!(status.last_error.is_none());
3754    }
3755
3756    #[test]
3757    fn sql_ref_missing_compression_deserializes_as_uncompressed_legacy_shape() {
3758        let json = serde_json::json!({
3759            "name": "legacy-plan-sql",
3760            "key": "plan.sql"
3761        });
3762
3763        let sql_ref: SqlRef = serde_json::from_value(json).expect("legacy SqlRef should decode");
3764
3765        assert_eq!(sql_ref.name, "legacy-plan-sql");
3766        assert_eq!(sql_ref.key, "plan.sql");
3767        assert_eq!(sql_ref.compression, None);
3768    }
3769
3770    #[test]
3771    fn plan_spec_camel_case_serialization() {
3772        let spec = PostgresPolicyPlanSpec {
3773            policy_ref: PolicyPlanRef {
3774                name: "my-policy".into(),
3775            },
3776            policy_generation: 3,
3777            reconciliation_mode: CrdReconciliationMode::Authoritative,
3778            owned_roles: vec!["role-a".into()],
3779            owned_schemas: vec!["public".into()],
3780            managed_database_identity: "ns/secret/key".into(),
3781        };
3782
3783        let json = serde_json::to_value(&spec).expect("should serialize to JSON");
3784        let obj = json.as_object().expect("should be a JSON object");
3785
3786        assert!(
3787            obj.contains_key("policyRef"),
3788            "should use camelCase: policyRef"
3789        );
3790        assert!(
3791            obj.contains_key("policyGeneration"),
3792            "should use camelCase: policyGeneration"
3793        );
3794        assert!(
3795            obj.contains_key("reconciliationMode"),
3796            "should use camelCase: reconciliationMode"
3797        );
3798        assert!(
3799            obj.contains_key("ownedRoles"),
3800            "should use camelCase: ownedRoles"
3801        );
3802        assert!(
3803            obj.contains_key("ownedSchemas"),
3804            "should use camelCase: ownedSchemas"
3805        );
3806        assert!(
3807            obj.contains_key("managedDatabaseIdentity"),
3808            "should use camelCase: managedDatabaseIdentity"
3809        );
3810    }
3811}