Skip to main content

pgroles_operator/
reconciler.rs

1//! Reconciliation logic for `PostgresPolicy` custom resources.
2//!
3//! Implements the core reconcile loop: read desired state from the CR,
4//! inspect current state from the database, compute diff, and apply changes.
5//!
6//! Reconciliation is serialized per database target to prevent overlapping
7//! inspect/diff/apply cycles:
8//!
9//! 1. **In-process lock** — [`OperatorContext::try_lock_database`] prevents
10//!    concurrent reconciles within the same operator replica.
11//! 2. **PostgreSQL advisory lock** — [`crate::advisory::try_acquire`] prevents
12//!    concurrent operations across multiple operator replicas.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use crate::events::{PlanEventType, publish_plan_event, publish_status_events};
18use kube::ResourceExt;
19use kube::api::{Api, Patch, PatchParams};
20use kube::runtime::controller::Action;
21use kube::runtime::finalizer::{self, Event as FinalizerEvent};
22use tracing::info;
23
24use crate::context::{ContextError, OperatorContext};
25use crate::crd::{
26    ChangeSummary, DatabaseIdentity, PolicyMode, PostgresPolicy, PostgresPolicyPlan,
27    PostgresPolicyStatus, conflict_condition, degraded_condition, drifted_condition,
28    paused_condition, ready_condition, reconciling_condition,
29};
30
31/// Finalizer name for PostgresPolicy resources.
32const FINALIZER: &str = "pgroles.io/finalizer";
33
34/// Default requeue interval when no interval is specified on the CR.
35const DEFAULT_REQUEUE_SECS: u64 = 300; // 5 minutes
36
37/// Base requeue delay when lock contention is detected.
38const LOCK_CONTENTION_BASE_SECS: u64 = 10;
39
40/// Maximum jitter added to the base requeue delay on lock contention.
41const LOCK_CONTENTION_JITTER_SECS: u64 = 20;
42
43/// Base requeue delay when transient operational failures occur.
44const TRANSIENT_BACKOFF_BASE_SECS: u64 = 5;
45
46/// Maximum requeue delay for transient operational failures.
47const TRANSIENT_BACKOFF_MAX_SECS: u64 = 300;
48
49/// SQLSTATE returned by PostgreSQL for insufficient privileges.
50const SQLSTATE_INSUFFICIENT_PRIVILEGE: &str = "42501";
51const SQLSTATE_INVALID_SCHEMA_NAME: &str = "3F000";
52const SQLSTATE_UNDEFINED_TABLE: &str = "42P01";
53const SQLSTATE_UNDEFINED_FUNCTION: &str = "42883";
54const SQLSTATE_UNDEFINED_OBJECT: &str = "42704";
55
56/// Maximum amount of rendered planned SQL stored in status.
57const MAX_PLANNED_SQL_STATUS_BYTES: usize = 16 * 1024;
58
59enum ReconcileOutcome {
60    Reconciled,
61    Planned,
62    Suspended,
63    Conflict,
64    LockContention,
65}
66
67impl ReconcileOutcome {
68    fn result(&self) -> &'static str {
69        match self {
70            ReconcileOutcome::Reconciled => "success",
71            ReconcileOutcome::Planned => "planned",
72            ReconcileOutcome::Suspended => "suspended",
73            ReconcileOutcome::Conflict => "conflict",
74            ReconcileOutcome::LockContention => "contention",
75        }
76    }
77
78    fn reason(&self) -> &'static str {
79        match self {
80            ReconcileOutcome::Reconciled => "Reconciled",
81            ReconcileOutcome::Planned => "Planned",
82            ReconcileOutcome::Suspended => "Suspended",
83            ReconcileOutcome::Conflict => "ConflictingPolicy",
84            ReconcileOutcome::LockContention => "LockContention",
85        }
86    }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum RetryClass {
91    Slow,
92    LockContention,
93    Transient,
94}
95
96/// Errors that can occur during reconciliation.
97#[derive(Debug, thiserror::Error)]
98pub enum ReconcileError {
99    #[error("context error: {0}")]
100    Context(#[from] Box<ContextError>),
101
102    #[error("manifest expansion error: {0}")]
103    ManifestExpansion(#[from] pgroles_core::manifest::ManifestError),
104
105    #[error("database inspection error: {0}")]
106    Inspect(#[from] pgroles_inspect::InspectError),
107
108    #[error("SQL execution error: {0}")]
109    SqlExec(#[from] sqlx::Error),
110
111    #[error("{0}")]
112    UnsafeRoleDrops(String),
113
114    #[error("Kubernetes API error: {0}")]
115    Kube(#[from] kube::Error),
116
117    #[error("resource has no namespace")]
118    NoNamespace,
119
120    #[error("invalid interval \"{0}\": {1}")]
121    InvalidInterval(String, String),
122
123    #[error("invalid spec: {0}")]
124    InvalidSpec(String),
125
126    #[error(
127        "policy references objects that do not exist in target database: {0}. Either create \
128         the missing objects, remove them from the policy, or verify the policy is pointing at \
129         the intended database."
130    )]
131    MissingDatabaseObjects(String),
132
133    #[error("{0}")]
134    UnsatisfiableWildcardGrant(String),
135
136    #[error("{0}")]
137    ConflictingPolicy(String),
138
139    #[error("lock contention on database \"{0}\": {1}")]
140    LockContention(String, String),
141
142    #[error("Secret \"{secret}\" key \"{key}\" for role \"{role}\" password is empty")]
143    EmptyPasswordSecret {
144        role: String,
145        secret: String,
146        key: String,
147    },
148
149    #[error("password generation error: {0}")]
150    PasswordGeneration(#[from] Box<crate::password::PasswordError>),
151
152    #[error("plan SQL storage error: {0}")]
153    PlanSqlStorage(String),
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157struct ResolvedPassword {
158    cleartext: String,
159    source_version: String,
160}
161
162/// Parse a duration string like "5m", "1h", "30s", "2h30m".
163fn parse_interval(interval: &str) -> Result<Duration, ReconcileError> {
164    let interval = interval.trim();
165    if interval.is_empty() {
166        return Ok(Duration::from_secs(DEFAULT_REQUEUE_SECS));
167    }
168
169    let mut total_secs: u64 = 0;
170    let mut current_num = String::new();
171
172    for ch in interval.chars() {
173        if ch.is_ascii_digit() {
174            current_num.push(ch);
175        } else {
176            let num: u64 = current_num.parse().map_err(|_| {
177                ReconcileError::InvalidInterval(
178                    interval.to_string(),
179                    format!("invalid number before '{ch}'"),
180                )
181            })?;
182            current_num.clear();
183
184            match ch {
185                'h' => total_secs += num * 3600,
186                'm' => total_secs += num * 60,
187                's' => total_secs += num,
188                _ => {
189                    return Err(ReconcileError::InvalidInterval(
190                        interval.to_string(),
191                        format!("unknown unit '{ch}'"),
192                    ));
193                }
194            }
195        }
196    }
197
198    // If there's a trailing number with no unit, treat as seconds.
199    if !current_num.is_empty() {
200        let num: u64 = current_num.parse().map_err(|_| {
201            ReconcileError::InvalidInterval(interval.to_string(), "trailing number".to_string())
202        })?;
203        total_secs += num;
204    }
205
206    if total_secs == 0 {
207        return Ok(Duration::from_secs(DEFAULT_REQUEUE_SECS));
208    }
209
210    Ok(Duration::from_secs(total_secs))
211}
212
213/// Top-level reconcile entry point called by the kube-rs controller runtime.
214///
215/// Uses the finalizer pattern for cleanup on deletion.
216pub async fn reconcile(
217    resource: Arc<PostgresPolicy>,
218    ctx: Arc<OperatorContext>,
219) -> Result<Action, finalizer::Error<ReconcileError>> {
220    let api: Api<PostgresPolicy> = Api::namespaced(
221        ctx.kube_client.clone(),
222        resource.namespace().as_deref().unwrap_or("default"),
223    );
224
225    finalizer::finalizer(&api, FINALIZER, resource, |event| async {
226        match event {
227            FinalizerEvent::Apply(resource) => reconcile_apply(&resource, &ctx).await,
228            FinalizerEvent::Cleanup(resource) => reconcile_cleanup(&resource, &ctx).await,
229        }
230    })
231    .await
232}
233
234/// Error handler — called when reconcile returns an error.
235pub fn error_policy(
236    resource: Arc<PostgresPolicy>,
237    error: &finalizer::Error<ReconcileError>,
238    _ctx: Arc<OperatorContext>,
239) -> Action {
240    retry_action(&resource, error)
241}
242
243fn retry_action(resource: &PostgresPolicy, error: &finalizer::Error<ReconcileError>) -> Action {
244    match retry_class(error) {
245        RetryClass::LockContention => {
246            if let finalizer::Error::ApplyFailed(ReconcileError::LockContention(db, reason)) = error
247            {
248                tracing::info!(database = %db, reason = %reason, "requeuing due to lock contention");
249            }
250            requeue_with_jitter()
251        }
252        RetryClass::Slow => {
253            let delay = slow_retry_delay(resource);
254            tracing::info!(
255                delay_secs = delay.as_secs(),
256                error = %error,
257                "requeuing on normal interval for non-transient failure"
258            );
259            Action::requeue(delay)
260        }
261        RetryClass::Transient => {
262            let attempts = next_transient_failure_count(resource);
263            let delay = transient_backoff_delay(attempts);
264            tracing::warn!(
265                attempts,
266                delay_secs = delay.as_secs(),
267                error = %error,
268                "requeuing with exponential backoff after transient failure"
269            );
270            Action::requeue(delay)
271        }
272    }
273}
274
275/// Compute a requeue delay with jitter for lock contention back-off.
276fn requeue_with_jitter() -> Action {
277    let delay = jitter_delay();
278    tracing::debug!(delay_secs = delay.as_secs(), "requeue with jitter");
279    Action::requeue(delay)
280}
281
282/// Compute a jittered delay for lock contention back-off.
283///
284/// Returns a [`Duration`] in the range
285/// `[LOCK_CONTENTION_BASE_SECS, LOCK_CONTENTION_BASE_SECS + LOCK_CONTENTION_JITTER_SECS]`.
286fn jitter_delay() -> Duration {
287    // Simple jitter: base + pseudo-random portion of the jitter window.
288    // We combine subsecond nanos with a hash of the thread ID for better
289    // entropy when multiple reconciles hit contention simultaneously.
290    let nanos = std::time::SystemTime::now()
291        .duration_since(std::time::UNIX_EPOCH)
292        .unwrap_or_default()
293        .subsec_nanos();
294    let thread_entropy = {
295        use std::hash::{Hash, Hasher};
296        let mut hasher = std::collections::hash_map::DefaultHasher::new();
297        std::thread::current().id().hash(&mut hasher);
298        hasher.finish() as u32
299    };
300    let jitter_secs = ((nanos ^ thread_entropy) as u64) % (LOCK_CONTENTION_JITTER_SECS + 1);
301    Duration::from_secs(LOCK_CONTENTION_BASE_SECS + jitter_secs)
302}
303
304fn transient_backoff_delay(attempts: u32) -> Duration {
305    let exponent = attempts.saturating_sub(1).min(10);
306    let base_delay = TRANSIENT_BACKOFF_BASE_SECS
307        .saturating_mul(1_u64 << exponent)
308        .min(TRANSIENT_BACKOFF_MAX_SECS);
309    let remaining_headroom = TRANSIENT_BACKOFF_MAX_SECS.saturating_sub(base_delay);
310    let jitter_window = remaining_headroom.min((base_delay / 2).max(1));
311    let jitter_secs = if jitter_window == 0 {
312        0
313    } else {
314        pseudo_random_window(jitter_window)
315    };
316    Duration::from_secs((base_delay + jitter_secs).min(TRANSIENT_BACKOFF_MAX_SECS))
317}
318
319fn pseudo_random_window(window_secs: u64) -> u64 {
320    if window_secs == 0 {
321        return 0;
322    }
323    let nanos = std::time::SystemTime::now()
324        .duration_since(std::time::UNIX_EPOCH)
325        .unwrap_or_default()
326        .subsec_nanos();
327    let thread_entropy = {
328        use std::hash::{Hash, Hasher};
329        let mut hasher = std::collections::hash_map::DefaultHasher::new();
330        std::thread::current().id().hash(&mut hasher);
331        hasher.finish() as u32
332    };
333    ((nanos ^ thread_entropy) as u64) % (window_secs + 1)
334}
335
336fn retry_class(error: &finalizer::Error<ReconcileError>) -> RetryClass {
337    match error {
338        finalizer::Error::ApplyFailed(reconcile_error) => {
339            retry_class_for_reconcile_error(reconcile_error)
340        }
341        finalizer::Error::CleanupFailed(_)
342        | finalizer::Error::AddFinalizer(_)
343        | finalizer::Error::RemoveFinalizer(_)
344        | finalizer::Error::UnnamedObject
345        | finalizer::Error::InvalidFinalizer => RetryClass::Transient,
346    }
347}
348
349fn retry_class_for_reconcile_error(error: &ReconcileError) -> RetryClass {
350    match error {
351        ReconcileError::LockContention(_, _) => RetryClass::LockContention,
352        ReconcileError::ManifestExpansion(_)
353        | ReconcileError::InvalidInterval(_, _)
354        | ReconcileError::InvalidSpec(_)
355        | ReconcileError::MissingDatabaseObjects(_)
356        | ReconcileError::UnsatisfiableWildcardGrant(_)
357        | ReconcileError::ConflictingPolicy(_)
358        | ReconcileError::UnsafeRoleDrops(_)
359        | ReconcileError::EmptyPasswordSecret { .. }
360        | ReconcileError::NoNamespace
361        | ReconcileError::PlanSqlStorage(_) => RetryClass::Slow,
362        ReconcileError::PasswordGeneration(err) => {
363            if err.is_transient() {
364                RetryClass::Transient
365            } else {
366                RetryClass::Slow
367            }
368        }
369        ReconcileError::Context(context) => match context.as_ref() {
370            ContextError::SecretMissing { .. } => RetryClass::Slow,
371            ContextError::SecretFetch { .. } => {
372                if context.is_secret_fetch_non_transient() {
373                    RetryClass::Slow
374                } else {
375                    RetryClass::Transient
376                }
377            }
378            ContextError::DatabaseConnect { .. } => RetryClass::Transient,
379            ContextError::EmptyResolvedValue { .. }
380            | ContextError::InvalidResolvedSslMode { .. } => RetryClass::Slow,
381        },
382        ReconcileError::Inspect(error) => {
383            if inspect_error_is_non_transient(error) {
384                RetryClass::Slow
385            } else {
386                RetryClass::Transient
387            }
388        }
389        ReconcileError::SqlExec(error) => {
390            if sqlx_error_is_non_transient(error) {
391                RetryClass::Slow
392            } else {
393                RetryClass::Transient
394            }
395        }
396        ReconcileError::Kube(_) => RetryClass::Transient,
397    }
398}
399
400fn inspect_error_is_non_transient(error: &pgroles_inspect::InspectError) -> bool {
401    match error {
402        pgroles_inspect::InspectError::Database(error) => sqlx_error_is_non_transient(error),
403    }
404}
405
406/// Classification of a database-level SQL error for retry and status reporting.
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408enum SqlErrorKind {
409    /// Insufficient privileges (SQLSTATE 42501) — RBAC-style failure,
410    /// won't fix itself.
411    InsufficientPrivileges,
412    /// A referenced schema, relation, function, or object does not exist
413    /// (SQLSTATE 3F000, 42P01, 42883, 42704). Typically a policy/environment
414    /// mismatch that needs operator action.
415    MissingDatabaseObject,
416    /// Everything else — retry with exponential backoff.
417    Transient,
418}
419
420fn classify_sqlx_error(error: &sqlx::Error) -> SqlErrorKind {
421    match error
422        .as_database_error()
423        .and_then(|database_error| database_error.code())
424        .as_deref()
425    {
426        Some(SQLSTATE_INSUFFICIENT_PRIVILEGE) => SqlErrorKind::InsufficientPrivileges,
427        Some(SQLSTATE_INVALID_SCHEMA_NAME)
428        | Some(SQLSTATE_UNDEFINED_TABLE)
429        | Some(SQLSTATE_UNDEFINED_FUNCTION)
430        | Some(SQLSTATE_UNDEFINED_OBJECT) => SqlErrorKind::MissingDatabaseObject,
431        _ => SqlErrorKind::Transient,
432    }
433}
434
435fn sqlx_error_is_non_transient(error: &sqlx::Error) -> bool {
436    !matches!(classify_sqlx_error(error), SqlErrorKind::Transient)
437}
438
439fn next_transient_failure_count(resource: &PostgresPolicy) -> u32 {
440    resource
441        .status
442        .as_ref()
443        .map(|status| status.transient_failure_count.max(0) as u32)
444        .unwrap_or(0)
445        .saturating_add(1)
446}
447
448fn slow_retry_delay(resource: &PostgresPolicy) -> Duration {
449    parse_interval(&resource.spec.interval)
450        .unwrap_or_else(|_| Duration::from_secs(DEFAULT_REQUEUE_SECS))
451}
452
453/// Collect every schema name referenced by an expanded manifest.
454///
455/// Covers schema-type grants (where the schema is in `object.name`), grants on
456/// objects within a schema (where the schema is in `object.schema`), and
457/// default privileges (which always carry a schema).
458fn referenced_schema_names(
459    expanded: &pgroles_core::manifest::ExpandedManifest,
460) -> std::collections::BTreeSet<String> {
461    let mut names: std::collections::BTreeSet<String> = expanded
462        .schemas
463        .iter()
464        .map(|schema| schema.name.clone())
465        .collect();
466    for grant in &expanded.grants {
467        if grant.object.object_type == pgroles_core::manifest::ObjectType::Schema
468            && let Some(name) = &grant.object.name
469        {
470            names.insert(name.clone());
471        }
472        if let Some(schema) = &grant.object.schema {
473            names.insert(schema.clone());
474        }
475    }
476    for dp in &expanded.default_privileges {
477        names.insert(dp.schema.clone());
478    }
479    names
480}
481
482fn declared_schema_names(
483    expanded: &pgroles_core::manifest::ExpandedManifest,
484) -> std::collections::BTreeSet<String> {
485    expanded
486        .schemas
487        .iter()
488        .map(|schema| schema.name.clone())
489        .collect()
490}
491
492/// Pre-flight check: ensure every schema referenced by the policy exists in
493/// the target database. Returns [`ReconcileError::MissingDatabaseObjects`]
494/// listing the missing schemas if any are absent.
495/// Returns true for PostgreSQL system schemas that always exist but are
496/// excluded from [`pgroles_inspect::fetch_existing_schemas`].
497fn is_system_schema(name: &str) -> bool {
498    name.starts_with("pg_") || name == "information_schema"
499}
500
501/// Pre-flight check: ensure every schema referenced by the policy exists in
502/// the target database. Returns [`ReconcileError::MissingDatabaseObjects`]
503/// listing the missing schemas if any are absent.
504///
505/// System schemas (`pg_*`, `information_schema`) are excluded from the check
506/// since they always exist but are filtered out of the inspect query.
507async fn validate_referenced_schemas_exist(
508    pool: &sqlx::PgPool,
509    expanded: &pgroles_core::manifest::ExpandedManifest,
510) -> Result<(), ReconcileError> {
511    let referenced = externally_required_schema_names(expanded);
512    if referenced.is_empty() {
513        return Ok(());
514    }
515    let existing = pgroles_inspect::fetch_existing_schemas(pool).await?;
516    let missing: Vec<String> = referenced
517        .into_iter()
518        .filter(|name| !existing.contains(name))
519        .collect();
520    if missing.is_empty() {
521        Ok(())
522    } else {
523        let formatted = missing
524            .iter()
525            .map(|name| format!("schema \"{name}\""))
526            .collect::<Vec<_>>()
527            .join(", ");
528        Err(ReconcileError::MissingDatabaseObjects(formatted))
529    }
530}
531
532fn externally_required_schema_names(
533    expanded: &pgroles_core::manifest::ExpandedManifest,
534) -> std::collections::BTreeSet<String> {
535    let declared = declared_schema_names(expanded);
536    referenced_schema_names(expanded)
537        .into_iter()
538        .filter(|name| !is_system_schema(name) && !declared.contains(name))
539        .collect()
540}
541
542/// Apply reconciliation — the main "ensure desired state" logic.
543///
544/// The in-process per-database lock is acquired *inside* [`reconcile_apply_inner`],
545/// after the connection probe succeeds. That way a bad-credentials,
546/// secret-fetch, or spec-validation failure produces an ordinary error which
547/// flows through the status-updating path below, instead of competing for
548/// the lock with parallel reconciles for the same `database_identity`. With
549/// three policies sharing one secret, a Secret rotation to bad credentials
550/// would otherwise serialize them on the lock — each holding it for the full
551/// `POOL_ACQUIRE_TIMEOUT_SECS` worth of pool-timeout — and an unlucky policy
552/// could spend tens of seconds in lock-contention requeues without ever
553/// updating its status condition (lock contention is silent by design).
554async fn reconcile_apply(
555    resource: &PostgresPolicy,
556    ctx: &OperatorContext,
557) -> Result<Action, ReconcileError> {
558    let reconcile_guard = ctx.observability.start_reconcile();
559
560    let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
561    let identity = DatabaseIdentity::from_connection(&namespace, &resource.spec.connection);
562
563    match reconcile_apply_inner(resource, ctx, &identity).await {
564        Ok((action, outcome)) => {
565            reconcile_guard.record_result(outcome.result(), outcome.reason());
566            Ok(action)
567        }
568        Err(ReconcileError::LockContention(db, reason)) => {
569            // Lock contention is expected during normal multi-replica operation.
570            // Re-raise without setting Degraded status to avoid false alarms.
571            ctx.observability.record_lock_contention();
572            reconcile_guard.record_result(
573                ReconcileOutcome::LockContention.result(),
574                ReconcileOutcome::LockContention.reason(),
575            );
576            tracing::info!(database = %db, %reason, "lock contention — will requeue");
577            Err(ReconcileError::LockContention(db, reason))
578        }
579        Err(err) => {
580            let error_message = err.to_string();
581            let error_reason = err.reason();
582            let is_transient_failure =
583                retry_class_for_reconcile_error(&err) == RetryClass::Transient;
584            // Unsatisfiable wildcards would regenerate the same impossible plan.
585            // Other failures keep any plan ref so the same plan can be retried.
586            let clear_current_plan_ref =
587                matches!(&err, ReconcileError::UnsatisfiableWildcardGrant(_));
588            match error_reason {
589                "DatabaseConnectionFailed" => {
590                    ctx.observability.record_database_connection_failure()
591                }
592                "InvalidSpec" => ctx.observability.record_invalid_spec(),
593                "ConflictingPolicy" => ctx.observability.record_policy_conflict(),
594                "ApplyFailed" | "MissingDatabaseObject" | "UnsatisfiableWildcardGrant" => {
595                    ctx.observability.record_apply_result("error")
596                }
597                _ => {}
598            }
599            reconcile_guard.record_result("error", error_reason);
600            if let Err(status_err) = update_status(ctx, resource, |status| {
601                mark_reconcile_failure_status(
602                    status,
603                    error_reason,
604                    &error_message,
605                    is_transient_failure,
606                    clear_current_plan_ref,
607                );
608            })
609            .await
610            {
611                tracing::warn!(%status_err, "failed to update degraded status");
612            }
613            Err(err)
614        }
615    }
616}
617
618fn mark_reconcile_failure_status(
619    status: &mut PostgresPolicyStatus,
620    error_reason: &str,
621    error_message: &str,
622    is_transient_failure: bool,
623    clear_current_plan_ref: bool,
624) {
625    status.set_condition(ready_condition(false, error_reason, error_message));
626    status.set_condition(degraded_condition(error_reason, error_message));
627    status.conditions.retain(|c| {
628        c.condition_type != "Reconciling"
629            && c.condition_type != "Paused"
630            && c.condition_type != "Drifted"
631            && c.condition_type != "Conflict"
632    });
633    status.change_summary = None;
634    status.planned_sql = None;
635    status.planned_sql_truncated = false;
636    if clear_current_plan_ref {
637        status.current_plan_ref = None;
638    }
639    status.last_error = Some(error_message.to_string());
640    if is_transient_failure {
641        status.transient_failure_count += 1;
642    } else {
643        status.transient_failure_count = 0;
644    }
645}
646
647async fn reconcile_apply_inner(
648    resource: &PostgresPolicy,
649    ctx: &OperatorContext,
650    identity: &DatabaseIdentity,
651) -> Result<(Action, ReconcileOutcome), ReconcileError> {
652    let name = resource.name_any();
653    let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
654
655    let spec = &resource.spec;
656    let requeue_interval = parse_interval(&spec.interval)?;
657    let generation = resource.metadata.generation;
658
659    // If suspended, just requeue without doing anything.
660    if spec.suspend {
661        update_status(ctx, resource, |status| {
662            status.set_condition(paused_condition("Reconciliation suspended by spec"));
663            status.set_condition(ready_condition(
664                false,
665                "Suspended",
666                "Reconciliation suspended by spec",
667            ));
668            status
669                .conditions
670                .retain(|c| c.condition_type != "Reconciling" && c.condition_type != "Drifted");
671            status.last_attempted_generation = generation;
672            status.last_error = None;
673            status.planned_sql = None;
674            status.planned_sql_truncated = false;
675            status.transient_failure_count = 0;
676        })
677        .await?;
678        info!(name, namespace, "reconciliation suspended, requeuing");
679        return Ok((
680            Action::requeue(requeue_interval),
681            ReconcileOutcome::Suspended,
682        ));
683    }
684
685    info!(name, namespace, "starting reconciliation");
686
687    // Update status to "Reconciling".
688    // Note: do NOT clear last_error here — it should persist until a successful
689    // reconcile clears it. Clearing on every retry cycle would race with the
690    // error handler that sets it.
691    update_status(ctx, resource, |status| {
692        status.set_condition(reconciling_condition("Reconciliation in progress"));
693        status
694            .conditions
695            .retain(|c| c.condition_type != "Paused" && c.condition_type != "Drifted");
696        status.last_attempted_generation = generation;
697    })
698    .await?;
699
700    spec.validate_connection_spec()
701        .map_err(|err| ReconcileError::InvalidSpec(err.to_string()))?;
702    spec.validate_password_specs(&name)
703        .map_err(|err| ReconcileError::InvalidSpec(err.to_string()))?;
704
705    let ownership = spec.ownership_claims()?;
706    update_status(ctx, resource, |status| {
707        status.managed_database_identity = Some(identity.as_str().to_string());
708        status.owned_roles = ownership.roles.iter().cloned().collect();
709        status.owned_schemas = ownership.schemas.iter().cloned().collect();
710    })
711    .await?;
712
713    if let Some(conflict_message) =
714        detect_policy_conflict(ctx, resource, identity, &ownership).await?
715    {
716        update_status(ctx, resource, |status| {
717            status.set_condition(ready_condition(
718                false,
719                "ConflictingPolicy",
720                &conflict_message,
721            ));
722            status.set_condition(conflict_condition("ConflictingPolicy", &conflict_message));
723            status.set_condition(degraded_condition("ConflictingPolicy", &conflict_message));
724            status
725                .conditions
726                .retain(|c| c.condition_type != "Reconciling" && c.condition_type != "Drifted");
727            status.change_summary = None;
728            status.planned_sql = None;
729            status.planned_sql_truncated = false;
730            status.last_error = Some(conflict_message.clone());
731            status.transient_failure_count = 0;
732        })
733        .await?;
734        ctx.observability.record_policy_conflict();
735        info!(name, namespace, %conflict_message, "reconciliation blocked by conflicting policy");
736        return Ok((
737            Action::requeue(requeue_interval),
738            ReconcileOutcome::Conflict,
739        ));
740    }
741
742    // 1. Convert CRD spec to core manifest.
743    let manifest = spec.to_policy_manifest();
744
745    // 2. Expand the manifest (profiles × schemas → concrete roles/grants).
746    let expanded = pgroles_core::manifest::expand_manifest(&manifest)?;
747
748    // 3. Build desired RoleGraph from expanded manifest.
749    let default_owner = manifest.default_owner.as_deref();
750    let desired = pgroles_core::model::RoleGraph::from_expanded(&expanded, default_owner)?;
751
752    // 4. Get a database pool.
753    //
754    // This is the connection probe: a bad URL, refused TCP connection, or
755    // failed authentication surfaces here as `ContextError::DatabaseConnect`,
756    // which the outer error handler in `reconcile_apply` translates into a
757    // `Ready=False/DatabaseConnectionFailed` status update. We do this BEFORE
758    // taking the in-process lock so multiple policies sharing the same
759    // `database_identity` can all observe a connection failure in parallel,
760    // instead of serializing on the lock and starving each other under
761    // sustained bad-credentials conditions.
762    let pool = ctx
763        .get_or_create_pool(&namespace, &spec.connection)
764        .await
765        .map_err(Box::new)?;
766
767    // 5. Acquire the in-process per-database lock for the DDL phase.
768    //
769    // The lock serializes inspect+diff+apply against a single
770    // `database_identity` within this replica, so two reconciles can't
771    // compute conflicting plans and stack DDL on top of each other. It is
772    // explicitly NOT held during the connection-probe phase above, since
773    // that work is idempotent and side-effect-free against the database.
774    //
775    // `_db_lock` must outlive the advisory lock and `apply_under_lock`
776    // call below; it is dropped at the end of this function.
777    let _db_lock = match ctx.try_lock_database(identity.as_str()).await {
778        Some(guard) => guard,
779        None => {
780            return Err(ReconcileError::LockContention(
781                identity.as_str().to_string(),
782                "in-process lock held by another reconcile".to_string(),
783            ));
784        }
785    };
786
787    // 6. Acquire PostgreSQL advisory lock for cross-replica safety.
788    let advisory_lock = match crate::advisory::try_acquire(&pool, identity.as_str()).await {
789        Ok(Some(lock)) => lock,
790        Ok(None) => {
791            return Err(ReconcileError::LockContention(
792                identity.as_str().to_string(),
793                "PostgreSQL advisory lock held by another session".to_string(),
794            ));
795        }
796        Err(err) => {
797            tracing::warn!(%err, "failed to acquire advisory lock — treating as connection error");
798            return Err(ReconcileError::SqlExec(err));
799        }
800    };
801
802    // Wrap the remaining work so the advisory lock is released on all paths.
803    let result = apply_under_lock(
804        resource,
805        ctx,
806        &pool,
807        &manifest,
808        &expanded,
809        &desired,
810        generation,
811        requeue_interval,
812        &name,
813        &namespace,
814        identity,
815    )
816    .await;
817
818    // Release advisory lock (always, even on error).
819    advisory_lock.release().await;
820
821    crate::plan::cleanup_old_plans_best_effort(&ctx.kube_client, resource, None).await;
822
823    result
824}
825
826/// Execute the inspect/diff/apply cycle while both locks are held.
827///
828/// Extracted to keep `reconcile_apply_inner` focused on lock acquisition.
829#[allow(clippy::too_many_arguments)]
830async fn apply_under_lock(
831    resource: &PostgresPolicy,
832    ctx: &OperatorContext,
833    pool: &sqlx::PgPool,
834    manifest: &pgroles_core::manifest::PolicyManifest,
835    expanded: &pgroles_core::manifest::ExpandedManifest,
836    desired: &pgroles_core::model::RoleGraph,
837    generation: Option<i64>,
838    requeue_interval: Duration,
839    name: &str,
840    namespace: &str,
841    identity: &DatabaseIdentity,
842) -> Result<(Action, ReconcileOutcome), ReconcileError> {
843    // 5b. Recover stuck Applying plans (operator may have crashed mid-apply).
844    if let Some(stuck_plan) =
845        crate::plan::get_plan_by_phase(&ctx.kube_client, resource, crate::crd::PlanPhase::Applying)
846            .await?
847    {
848        let applying_since_secs = stuck_plan
849            .status
850            .as_ref()
851            .and_then(|s| s.applying_since.as_deref())
852            .and_then(parse_rfc3339_to_epoch_secs);
853        if let Some(since_secs) = applying_since_secs {
854            let now_secs = std::time::SystemTime::now()
855                .duration_since(std::time::UNIX_EPOCH)
856                .unwrap_or_default()
857                .as_secs();
858            let elapsed_secs = now_secs.saturating_sub(since_secs);
859            let stuck_threshold_secs = 5 * 60; // 5 minutes
860            if elapsed_secs > stuck_threshold_secs {
861                tracing::warn!(
862                    plan = %stuck_plan.name_any(),
863                    elapsed_secs,
864                    "detected stuck Applying plan — marking as Failed"
865                );
866                crate::plan::mark_plan_failed(
867                    &ctx.kube_client,
868                    &stuck_plan,
869                    "execution interrupted: operator restarted during apply",
870                )
871                .await?;
872            }
873        }
874    }
875
876    // 6. Inspect current state from the database.
877    let has_database_grants = expanded
878        .grants
879        .iter()
880        .any(|g| g.object.object_type == pgroles_core::manifest::ObjectType::Database);
881    let inspect_config =
882        pgroles_inspect::InspectConfig::from_expanded(expanded, has_database_grants)
883            .with_additional_roles(
884                manifest
885                    .retirements
886                    .iter()
887                    .map(|retirement| retirement.role.clone()),
888            );
889    let inspection = pgroles_inspect::inspect_with_diagnostics(pool, &inspect_config).await?;
890    ctx.observability.record_inspection(&inspection.stats);
891    if !inspection.diagnostics.is_empty() {
892        return Err(ReconcileError::UnsatisfiableWildcardGrant(
893            inspection.diagnostics.to_string(),
894        ));
895    }
896    let current = inspection.graph;
897
898    // 6b. Pre-flight: validate that every schema referenced by the policy
899    // exists in the target database. This turns a mid-transaction
900    // `schema "X" does not exist` failure into a clear spec/environment
901    // mismatch error before we issue any DDL.
902    validate_referenced_schemas_exist(pool, expanded).await?;
903
904    // 7. Compute diff, filter by reconciliation mode, then inject password
905    // changes resolved from Kubernetes Secrets.
906    let reconciliation_mode: pgroles_core::diff::ReconciliationMode =
907        resource.spec.reconciliation_mode.into();
908    tracing::info!(%reconciliation_mode, "reconciliation mode");
909    let mut changes = pgroles_core::diff::filter_changes(
910        pgroles_core::diff::apply_role_retirements(
911            pgroles_core::diff::diff(&current, desired),
912            &manifest.retirements,
913        ),
914        reconciliation_mode,
915    );
916
917    let resolved_passwords = resolve_passwords_from_secrets(ctx, resource, namespace).await?;
918    let (password_changes, applied_password_source_versions) =
919        select_password_changes(&changes, &resolved_passwords, resource.status.as_ref());
920    if !password_changes.is_empty() {
921        changes = pgroles_core::diff::inject_password_changes(changes, &password_changes);
922    }
923    let dropped_roles: Vec<String> = changes
924        .iter()
925        .filter_map(|change| match change {
926            pgroles_core::diff::Change::DropRole { name } => Some(name.clone()),
927            _ => None,
928        })
929        .collect();
930    let drop_safety = pgroles_inspect::inspect_drop_role_safety(pool, &dropped_roles)
931        .await?
932        .assess(&manifest.retirements);
933    if !drop_safety.warnings.is_empty() {
934        tracing::info!(warnings = %drop_safety.warnings, "role-drop cleanup warnings");
935    }
936    if drop_safety.has_blockers() {
937        return Err(ReconcileError::UnsafeRoleDrops(
938            drop_safety.blockers.to_string(),
939        ));
940    }
941
942    let summary = summarize_changes(&changes);
943    let sql_ctx = detect_sql_context(pool, &inspect_config).await?;
944    let (planned_sql, planned_sql_truncated) = render_plan_sql_for_status(&changes, &sql_ctx);
945
946    let effective_approval = resource.spec.effective_approval();
947
948    if resource.spec.mode == PolicyMode::Plan {
949        let drift_detected = !changes.is_empty();
950        let ready_message = if drift_detected {
951            format!("Plan computed; {} change(s) pending", summary.total)
952        } else {
953            "Plan computed; database already matches desired state".to_string()
954        };
955        let drift_reason = if drift_detected {
956            "DriftDetected"
957        } else {
958            "InSync"
959        };
960        let drift_message = if drift_detected {
961            format!("{} planned change(s) pending review", summary.total)
962        } else {
963            "No pending changes".to_string()
964        };
965
966        ctx.observability
967            .record_plan_result(if drift_detected { "drift" } else { "clean" });
968        ctx.observability
969            .record_planned_changes(summary.total.max(0) as usize);
970
971        // Create a PostgresPolicyPlan resource for changes (if any).
972        let mut plan_ref_name = None;
973        if drift_detected {
974            let creation_result = crate::plan::create_or_update_plan(
975                &ctx.kube_client,
976                resource,
977                &changes,
978                &sql_ctx,
979                &inspect_config,
980                resource.spec.reconciliation_mode,
981                identity.as_str(),
982                &summary,
983            )
984            .await?;
985            let plan_name = creation_result.plan_name().to_string();
986
987            // Only emit PlanCreated event for genuinely new plans, not dedup hits.
988            if creation_result.is_created() {
989                let plans_api: Api<PostgresPolicyPlan> =
990                    Api::namespaced(ctx.kube_client.clone(), namespace);
991                let created_plan = plans_api.get(&plan_name).await?;
992                emit_plan_event(
993                    ctx,
994                    resource,
995                    &created_plan,
996                    PlanEventType::Created {
997                        change_count: summary.total,
998                    },
999                )
1000                .await;
1001            }
1002
1003            crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1004
1005            plan_ref_name = Some(plan_name);
1006        }
1007
1008        // Still write deprecated planned_sql to status for backward compat.
1009        update_status(ctx, resource, |status| {
1010            status.set_condition(ready_condition(true, "Planned", &ready_message));
1011            status.set_condition(drifted_condition(
1012                drift_detected,
1013                drift_reason,
1014                &drift_message,
1015            ));
1016            status.conditions.retain(|c| {
1017                c.condition_type != "Reconciling"
1018                    && c.condition_type != "Degraded"
1019                    && c.condition_type != "Conflict"
1020                    && c.condition_type != "Paused"
1021            });
1022            status.observed_generation = generation;
1023            status.last_attempted_generation = generation;
1024            status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1025            status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1026            status.change_summary = Some(summary.clone());
1027            status.last_reconcile_mode = Some(PolicyMode::Plan);
1028            status.planned_sql = planned_sql.clone();
1029            status.planned_sql_truncated = planned_sql_truncated;
1030            status.last_error = None;
1031            status.transient_failure_count = 0;
1032            if let Some(ref plan_name) = plan_ref_name {
1033                status.current_plan_ref = Some(crate::crd::PlanReference {
1034                    name: plan_name.clone(),
1035                });
1036            }
1037        })
1038        .await?;
1039
1040        info!(
1041            name,
1042            namespace,
1043            total = summary.total,
1044            drift_detected,
1045            "plan reconciliation complete"
1046        );
1047        return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1048    }
1049
1050    // Apply mode — behavior depends on effective approval mode.
1051    match effective_approval {
1052        crate::crd::ApprovalMode::Auto => {
1053            // Auto-approval: create plan -> immediately execute -> update status.
1054            // This wraps the existing apply behavior in the plan lifecycle.
1055            if !changes.is_empty() {
1056                let creation_result = crate::plan::create_or_update_plan(
1057                    &ctx.kube_client,
1058                    resource,
1059                    &changes,
1060                    &sql_ctx,
1061                    &inspect_config,
1062                    resource.spec.reconciliation_mode,
1063                    identity.as_str(),
1064                    &summary,
1065                )
1066                .await?;
1067                let plan_name = creation_result.plan_name().to_string();
1068
1069                // Fetch the plan, mark it approved, and execute it.
1070                let plans_api: Api<PostgresPolicyPlan> =
1071                    Api::namespaced(ctx.kube_client.clone(), namespace);
1072                let plan = plans_api.get(&plan_name).await?;
1073
1074                if creation_result.is_created() {
1075                    emit_plan_event(
1076                        ctx,
1077                        resource,
1078                        &plan,
1079                        PlanEventType::Created {
1080                            change_count: summary.total,
1081                        },
1082                    )
1083                    .await;
1084                }
1085
1086                crate::plan::mark_plan_approved(
1087                    &ctx.kube_client,
1088                    &plan,
1089                    "AutoApproved",
1090                    "Plan auto-approved by policy approval mode",
1091                )
1092                .await?;
1093
1094                // Re-fetch after approval status update.
1095                let plan = plans_api.get(&plan_name).await?;
1096                emit_plan_event(ctx, resource, &plan, PlanEventType::Approved).await;
1097                emit_plan_event(ctx, resource, &plan, PlanEventType::ApplyStarted).await;
1098
1099                match crate::plan::execute_plan(&ctx.kube_client, &plan, pool, &sql_ctx, &changes)
1100                    .await
1101                {
1102                    Ok(()) => {
1103                        emit_plan_event(ctx, resource, &plan, PlanEventType::ApplySucceeded).await;
1104                    }
1105                    Err(err) => {
1106                        emit_plan_event(
1107                            ctx,
1108                            resource,
1109                            &plan,
1110                            PlanEventType::ApplyFailed {
1111                                error: err.to_string(),
1112                            },
1113                        )
1114                        .await;
1115                        return Err(err);
1116                    }
1117                }
1118
1119                ctx.observability.record_apply_result("success");
1120
1121                crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1122
1123                info!(
1124                    name,
1125                    namespace,
1126                    total = summary.total,
1127                    plan = %plan_name,
1128                    "auto-approved plan applied"
1129                );
1130            } else {
1131                info!(name, namespace, "no changes needed");
1132            }
1133
1134            // Update status to Ready.
1135            update_status(ctx, resource, |status| {
1136                status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
1137                status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
1138                status.conditions.retain(|c| {
1139                    c.condition_type != "Reconciling"
1140                        && c.condition_type != "Degraded"
1141                        && c.condition_type != "Conflict"
1142                        && c.condition_type != "Paused"
1143                });
1144                status.observed_generation = generation;
1145                status.last_attempted_generation = generation;
1146                status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1147                status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1148                status.change_summary = Some(summary);
1149                status.last_reconcile_mode = Some(PolicyMode::Apply);
1150                status.planned_sql = None;
1151                status.planned_sql_truncated = false;
1152                status.last_error = None;
1153                status.applied_password_source_versions = applied_password_source_versions;
1154                status.transient_failure_count = 0;
1155            })
1156            .await?;
1157
1158            Ok((
1159                Action::requeue(requeue_interval),
1160                ReconcileOutcome::Reconciled,
1161            ))
1162        }
1163        crate::crd::ApprovalMode::Manual => {
1164            // Manual approval: check for an existing approved plan, or create one.
1165
1166            // First, check if there is a current pending plan that has been approved.
1167            if let Some(current_plan) =
1168                crate::plan::get_current_actionable_plan(&ctx.kube_client, resource).await?
1169            {
1170                let approval_state = crate::plan::check_plan_approval(&current_plan);
1171
1172                match approval_state {
1173                    crate::plan::PlanApprovalState::Approved => {
1174                        // Validate that the database state has not drifted since
1175                        // the plan was approved by comparing SQL hashes.
1176                        let fresh_sql = crate::plan::render_full_sql(&changes, &sql_ctx);
1177                        let fresh_hash = crate::plan::compute_sql_hash(&fresh_sql);
1178                        let stored_hash = current_plan
1179                            .status
1180                            .as_ref()
1181                            .and_then(|s| s.sql_hash.as_deref());
1182
1183                        if stored_hash != Some(&fresh_hash) {
1184                            // Database state changed since the plan was approved.
1185                            tracing::warn!(
1186                                plan = %current_plan.name_any(),
1187                                stored_hash = ?stored_hash,
1188                                fresh_hash = %fresh_hash,
1189                                "approved plan superseded: database state changed since approval"
1190                            );
1191
1192                            crate::plan::mark_plan_superseded(&ctx.kube_client, &current_plan)
1193                                .await?;
1194
1195                            // Create a new plan with the fresh changes.
1196                            let new_creation_result = crate::plan::create_or_update_plan(
1197                                &ctx.kube_client,
1198                                resource,
1199                                &changes,
1200                                &sql_ctx,
1201                                &inspect_config,
1202                                resource.spec.reconciliation_mode,
1203                                identity.as_str(),
1204                                &summary,
1205                            )
1206                            .await?;
1207                            let new_plan_name = new_creation_result.plan_name().to_string();
1208
1209                            if new_creation_result.is_created() {
1210                                let plans_api: Api<PostgresPolicyPlan> =
1211                                    Api::namespaced(ctx.kube_client.clone(), namespace);
1212                                let new_plan = plans_api.get(&new_plan_name).await?;
1213                                emit_plan_event(
1214                                    ctx,
1215                                    resource,
1216                                    &new_plan,
1217                                    PlanEventType::Created {
1218                                        change_count: summary.total,
1219                                    },
1220                                )
1221                                .await;
1222                            }
1223
1224                            crate::plan::update_policy_plan_ref(
1225                                &ctx.kube_client,
1226                                resource,
1227                                &new_plan_name,
1228                            )
1229                            .await?;
1230
1231                            let msg = format!(
1232                                "Plan {} superseded (DB state changed); new plan {} created with {} change(s) awaiting approval",
1233                                current_plan.name_any(),
1234                                new_plan_name,
1235                                summary.total,
1236                            );
1237                            update_status(ctx, resource, |status| {
1238                                status.set_condition(ready_condition(true, "Planned", &msg));
1239                                status.set_condition(drifted_condition(
1240                                    true,
1241                                    "DriftDetected",
1242                                    &format!("{} planned change(s) pending review", summary.total),
1243                                ));
1244                                status.conditions.retain(|c| {
1245                                    c.condition_type != "Reconciling"
1246                                        && c.condition_type != "Degraded"
1247                                        && c.condition_type != "Conflict"
1248                                        && c.condition_type != "Paused"
1249                                });
1250                                status.last_attempted_generation = generation;
1251                                status.change_summary = Some(summary.clone());
1252                                status.last_reconcile_mode = Some(PolicyMode::Apply);
1253                                status.planned_sql = planned_sql.clone();
1254                                status.planned_sql_truncated = planned_sql_truncated;
1255                                status.last_error = None;
1256                                status.transient_failure_count = 0;
1257                                status.current_plan_ref = Some(crate::crd::PlanReference {
1258                                    name: new_plan_name.clone(),
1259                                });
1260                            })
1261                            .await?;
1262
1263                            return Ok((
1264                                Action::requeue(requeue_interval),
1265                                ReconcileOutcome::Planned,
1266                            ));
1267                        }
1268
1269                        // Hash matches — safe to execute the approved plan.
1270                        info!(
1271                            name,
1272                            namespace,
1273                            plan = %current_plan.name_any(),
1274                            "executing manually approved plan"
1275                        );
1276
1277                        emit_plan_event(ctx, resource, &current_plan, PlanEventType::Approved)
1278                            .await;
1279
1280                        crate::plan::mark_plan_approved(
1281                            &ctx.kube_client,
1282                            &current_plan,
1283                            "ManuallyApproved",
1284                            "Plan approved via annotation",
1285                        )
1286                        .await?;
1287
1288                        let plans_api: Api<PostgresPolicyPlan> =
1289                            Api::namespaced(ctx.kube_client.clone(), namespace);
1290                        let plan = plans_api.get(&current_plan.name_any()).await?;
1291
1292                        emit_plan_event(ctx, resource, &plan, PlanEventType::ApplyStarted).await;
1293
1294                        match crate::plan::execute_plan(
1295                            &ctx.kube_client,
1296                            &plan,
1297                            pool,
1298                            &sql_ctx,
1299                            &changes,
1300                        )
1301                        .await
1302                        {
1303                            Ok(()) => {
1304                                emit_plan_event(
1305                                    ctx,
1306                                    resource,
1307                                    &plan,
1308                                    PlanEventType::ApplySucceeded,
1309                                )
1310                                .await;
1311                            }
1312                            Err(err) => {
1313                                emit_plan_event(
1314                                    ctx,
1315                                    resource,
1316                                    &plan,
1317                                    PlanEventType::ApplyFailed {
1318                                        error: err.to_string(),
1319                                    },
1320                                )
1321                                .await;
1322                                return Err(err);
1323                            }
1324                        }
1325
1326                        ctx.observability.record_apply_result("success");
1327
1328                        // Update status to Ready.
1329                        update_status(ctx, resource, |status| {
1330                            status.set_condition(ready_condition(
1331                                true,
1332                                "Reconciled",
1333                                "Approved plan applied",
1334                            ));
1335                            status.set_condition(drifted_condition(
1336                                false,
1337                                "InSync",
1338                                "No pending changes",
1339                            ));
1340                            status.conditions.retain(|c| {
1341                                c.condition_type != "Reconciling"
1342                                    && c.condition_type != "Degraded"
1343                                    && c.condition_type != "Conflict"
1344                                    && c.condition_type != "Paused"
1345                            });
1346                            status.observed_generation = generation;
1347                            status.last_attempted_generation = generation;
1348                            status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1349                            status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1350                            status.change_summary = Some(summary);
1351                            status.last_reconcile_mode = Some(PolicyMode::Apply);
1352                            status.planned_sql = None;
1353                            status.planned_sql_truncated = false;
1354                            status.last_error = None;
1355                            status.applied_password_source_versions =
1356                                applied_password_source_versions;
1357                            status.transient_failure_count = 0;
1358                        })
1359                        .await?;
1360
1361                        return Ok((
1362                            Action::requeue(requeue_interval),
1363                            ReconcileOutcome::Reconciled,
1364                        ));
1365                    }
1366                    crate::plan::PlanApprovalState::Rejected => {
1367                        crate::plan::mark_plan_rejected(&ctx.kube_client, &current_plan).await?;
1368                        emit_plan_event(ctx, resource, &current_plan, PlanEventType::Rejected)
1369                            .await;
1370                        info!(
1371                            name,
1372                            namespace,
1373                            plan = %current_plan.name_any(),
1374                            "plan rejected via annotation"
1375                        );
1376
1377                        // Update status to reflect rejection, but don't create a new plan
1378                        // in the same cycle to avoid tight reject-create loops.
1379                        update_status(ctx, resource, |status| {
1380                            status.set_condition(ready_condition(
1381                                true,
1382                                "Planned",
1383                                &format!(
1384                                    "Plan {} rejected; new plan will be created on next reconcile",
1385                                    current_plan.name_any()
1386                                ),
1387                            ));
1388                            status.last_attempted_generation = generation;
1389                            status.last_error = None;
1390                            status.transient_failure_count = 0;
1391                            status.current_plan_ref = None;
1392                        })
1393                        .await?;
1394
1395                        return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1396                    }
1397                    crate::plan::PlanApprovalState::Pending => {
1398                        // Plan exists and is pending — nothing to do, requeue.
1399                        info!(
1400                            name,
1401                            namespace,
1402                            plan = %current_plan.name_any(),
1403                            "plan awaiting manual approval"
1404                        );
1405
1406                        update_status(ctx, resource, |status| {
1407                            let msg = format!(
1408                                "Plan {} awaiting approval; {} change(s) pending",
1409                                current_plan.name_any(),
1410                                summary.total,
1411                            );
1412                            status.set_condition(ready_condition(true, "Planned", &msg));
1413                            status.set_condition(drifted_condition(
1414                                !changes.is_empty(),
1415                                if changes.is_empty() {
1416                                    "InSync"
1417                                } else {
1418                                    "DriftDetected"
1419                                },
1420                                &msg,
1421                            ));
1422                            status.conditions.retain(|c| {
1423                                c.condition_type != "Reconciling"
1424                                    && c.condition_type != "Degraded"
1425                                    && c.condition_type != "Conflict"
1426                                    && c.condition_type != "Paused"
1427                            });
1428                            status.last_attempted_generation = generation;
1429                            status.change_summary = Some(summary.clone());
1430                            status.planned_sql = planned_sql.clone();
1431                            status.planned_sql_truncated = planned_sql_truncated;
1432                            status.last_error = None;
1433                            status.transient_failure_count = 0;
1434                        })
1435                        .await?;
1436
1437                        return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1438                    }
1439                }
1440            }
1441
1442            // No pending plan (or previous one was rejected) — create a new plan.
1443            if changes.is_empty() {
1444                info!(name, namespace, "no changes needed (manual approval mode)");
1445
1446                update_status(ctx, resource, |status| {
1447                    status.set_condition(ready_condition(true, "Reconciled", "No changes needed"));
1448                    status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
1449                    status.conditions.retain(|c| {
1450                        c.condition_type != "Reconciling"
1451                            && c.condition_type != "Degraded"
1452                            && c.condition_type != "Conflict"
1453                            && c.condition_type != "Paused"
1454                    });
1455                    status.observed_generation = generation;
1456                    status.last_attempted_generation = generation;
1457                    status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1458                    status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1459                    status.change_summary = Some(summary);
1460                    status.last_reconcile_mode = Some(PolicyMode::Apply);
1461                    status.planned_sql = None;
1462                    status.planned_sql_truncated = false;
1463                    status.last_error = None;
1464                    status.applied_password_source_versions = applied_password_source_versions;
1465                    status.transient_failure_count = 0;
1466                })
1467                .await?;
1468
1469                return Ok((
1470                    Action::requeue(requeue_interval),
1471                    ReconcileOutcome::Reconciled,
1472                ));
1473            }
1474
1475            // Create a new plan and wait for approval.
1476            let creation_result = crate::plan::create_or_update_plan(
1477                &ctx.kube_client,
1478                resource,
1479                &changes,
1480                &sql_ctx,
1481                &inspect_config,
1482                resource.spec.reconciliation_mode,
1483                identity.as_str(),
1484                &summary,
1485            )
1486            .await?;
1487            let plan_name = creation_result.plan_name().to_string();
1488
1489            // Only emit PlanCreated event for genuinely new plans, not dedup hits.
1490            if creation_result.is_created() {
1491                let plans_api: Api<PostgresPolicyPlan> =
1492                    Api::namespaced(ctx.kube_client.clone(), namespace);
1493                let created_plan = plans_api.get(&plan_name).await?;
1494                emit_plan_event(
1495                    ctx,
1496                    resource,
1497                    &created_plan,
1498                    PlanEventType::Created {
1499                        change_count: summary.total,
1500                    },
1501                )
1502                .await;
1503            }
1504
1505            crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1506
1507            let msg = format!(
1508                "Plan {plan_name} created; {} change(s) awaiting approval",
1509                summary.total,
1510            );
1511            update_status(ctx, resource, |status| {
1512                status.set_condition(ready_condition(true, "Planned", &msg));
1513                status.set_condition(drifted_condition(
1514                    true,
1515                    "DriftDetected",
1516                    &format!("{} planned change(s) pending review", summary.total),
1517                ));
1518                status.conditions.retain(|c| {
1519                    c.condition_type != "Reconciling"
1520                        && c.condition_type != "Degraded"
1521                        && c.condition_type != "Conflict"
1522                        && c.condition_type != "Paused"
1523                });
1524                status.last_attempted_generation = generation;
1525                status.change_summary = Some(summary.clone());
1526                status.last_reconcile_mode = Some(PolicyMode::Apply);
1527                status.planned_sql = planned_sql.clone();
1528                status.planned_sql_truncated = planned_sql_truncated;
1529                status.last_error = None;
1530                status.transient_failure_count = 0;
1531                status.current_plan_ref = Some(crate::crd::PlanReference {
1532                    name: plan_name.clone(),
1533                });
1534            })
1535            .await?;
1536
1537            info!(
1538                name,
1539                namespace,
1540                total = summary.total,
1541                plan = %plan_name,
1542                "plan created, awaiting manual approval"
1543            );
1544
1545            Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned))
1546        }
1547    }
1548}
1549
1550/// Resolve role passwords from Kubernetes Secrets or generate them.
1551///
1552/// For each role that declares a `password`:
1553/// - `PasswordSpec::SecretRef`: fetches the password from the referenced Secret.
1554/// - `PasswordSpec::Generate`: reads the generated Secret if it exists; in
1555///   apply mode it creates the Secret if needed, while in plan mode it keeps
1556///   reconciliation non-mutating and synthesizes an in-memory password.
1557///
1558/// Returns a map of role name → cleartext password string suitable for
1559/// [`pgroles_core::diff::inject_password_changes`] (which computes the
1560/// SCRAM-SHA-256 verifier before creating `SetPassword` changes).
1561async fn resolve_passwords_from_secrets(
1562    ctx: &OperatorContext,
1563    resource: &PostgresPolicy,
1564    namespace: &str,
1565) -> Result<std::collections::BTreeMap<String, ResolvedPassword>, ReconcileError> {
1566    use k8s_openapi::api::core::v1::Secret;
1567
1568    let mut resolved = std::collections::BTreeMap::new();
1569
1570    // Cache fetched Secrets by name to avoid duplicate API calls when
1571    // multiple roles reference different keys in the same Secret.
1572    let mut secret_cache: std::collections::BTreeMap<String, Secret> =
1573        std::collections::BTreeMap::new();
1574
1575    let secrets_api: kube::Api<Secret> = kube::Api::namespaced(ctx.kube_client.clone(), namespace);
1576
1577    // First pass: fetch all referenced Secrets for secretRef roles.
1578    for role_spec in &resource.spec.roles {
1579        if let Some(pw) = &role_spec.password
1580            && let Some(secret_ref) = &pw.secret_ref
1581        {
1582            let secret_name = &secret_ref.name;
1583            if !secret_cache.contains_key(secret_name.as_str()) {
1584                let fetched = secrets_api.get(secret_name).await.map_err(|err| {
1585                    Box::new(crate::context::ContextError::SecretFetch {
1586                        name: secret_name.clone(),
1587                        namespace: namespace.to_string(),
1588                        source: err,
1589                    })
1590                })?;
1591                secret_cache.insert(secret_name.clone(), fetched);
1592            }
1593        }
1594    }
1595
1596    // Second pass: resolve passwords from cache (secretRef) or generate.
1597    for role_spec in &resource.spec.roles {
1598        if let Some(pw) = &role_spec.password {
1599            if let Some(gen_spec) = &pw.generate {
1600                let password = if resource.spec.mode == PolicyMode::Plan {
1601                    match crate::password::get_generated_secret(
1602                        ctx.kube_client.clone(),
1603                        namespace,
1604                        &resource.name_any(),
1605                        &role_spec.name,
1606                        gen_spec,
1607                    )
1608                    .await
1609                    .map_err(Box::new)?
1610                    {
1611                        Some(existing) => existing,
1612                        None => {
1613                            let secret_name = crate::password::generated_secret_name(
1614                                &resource.name_any(),
1615                                &role_spec.name,
1616                                gen_spec,
1617                            );
1618                            let secret_key = crate::password::generated_secret_key(gen_spec);
1619                            let cleartext = crate::password::generate_password(
1620                                gen_spec
1621                                    .length
1622                                    .unwrap_or(crate::password::DEFAULT_PASSWORD_LENGTH),
1623                            );
1624
1625                            crate::password::GeneratedPasswordSecret {
1626                                password: cleartext,
1627                                source_version:
1628                                    crate::password::missing_generated_secret_source_version(
1629                                        &secret_name,
1630                                        &secret_key,
1631                                    ),
1632                            }
1633                        }
1634                    }
1635                } else {
1636                    // Apply mode — ensure a Secret exists with a generated password.
1637                    crate::password::ensure_generated_secret(
1638                        ctx.kube_client.clone(),
1639                        namespace,
1640                        resource,
1641                        &role_spec.name,
1642                        gen_spec,
1643                    )
1644                    .await
1645                    .map_err(Box::new)?
1646                };
1647                resolved.insert(
1648                    role_spec.name.clone(),
1649                    ResolvedPassword {
1650                        cleartext: password.password,
1651                        source_version: password.source_version,
1652                    },
1653                );
1654            } else if pw.secret_ref.is_some() {
1655                // SecretRef mode — read from an existing Secret.
1656                let password = resolve_password_from_cache(&role_spec.name, pw, &secret_cache)?;
1657                resolved.insert(role_spec.name.clone(), password);
1658            }
1659        }
1660    }
1661
1662    Ok(resolved)
1663}
1664
1665/// Extract a password from a pre-fetched Secret cache for a `secretRef` role.
1666fn resolve_password_from_cache(
1667    role_name: &str,
1668    password_spec: &crate::crd::PasswordSpec,
1669    secret_cache: &std::collections::BTreeMap<String, k8s_openapi::api::core::v1::Secret>,
1670) -> Result<ResolvedPassword, ReconcileError> {
1671    let secret_ref = password_spec.secret_ref.as_ref().ok_or_else(|| {
1672        Box::new(crate::context::ContextError::SecretMissing {
1673            name: "(no secretRef)".to_string(),
1674            key: role_name.to_string(),
1675        })
1676    })?;
1677    let secret_name = &secret_ref.name;
1678    let secret_key = password_spec.secret_key.as_deref().unwrap_or(role_name);
1679
1680    let secret = secret_cache.get(secret_name.as_str()).ok_or_else(|| {
1681        Box::new(crate::context::ContextError::SecretMissing {
1682            name: secret_name.clone(),
1683            key: secret_key.to_string(),
1684        })
1685    })?;
1686
1687    let data = secret.data.as_ref().ok_or_else(|| {
1688        Box::new(crate::context::ContextError::SecretMissing {
1689            name: secret_name.clone(),
1690            key: secret_key.to_string(),
1691        })
1692    })?;
1693
1694    let value_bytes = data.get(secret_key).ok_or_else(|| {
1695        Box::new(crate::context::ContextError::SecretMissing {
1696            name: secret_name.clone(),
1697            key: secret_key.to_string(),
1698        })
1699    })?;
1700
1701    let password = String::from_utf8(value_bytes.0.clone()).map_err(|_| {
1702        Box::new(crate::context::ContextError::SecretMissing {
1703            name: secret_name.clone(),
1704            key: secret_key.to_string(),
1705        })
1706    })?;
1707
1708    if password.is_empty() {
1709        return Err(ReconcileError::EmptyPasswordSecret {
1710            role: role_name.to_string(),
1711            secret: secret_name.clone(),
1712            key: secret_key.to_string(),
1713        });
1714    }
1715
1716    let resource_version = secret
1717        .metadata
1718        .resource_version
1719        .as_deref()
1720        .unwrap_or("unknown");
1721    Ok(ResolvedPassword {
1722        cleartext: password,
1723        source_version: format!("{secret_name}:{secret_key}:{resource_version}"),
1724    })
1725}
1726
1727/// Resolve passwords from a pre-populated cache (for unit testing without K8s).
1728#[cfg(test)]
1729fn resolve_passwords_from_cached_secrets(
1730    resource: &PostgresPolicy,
1731    secret_cache: &std::collections::BTreeMap<String, k8s_openapi::api::core::v1::Secret>,
1732) -> Result<std::collections::BTreeMap<String, ResolvedPassword>, ReconcileError> {
1733    let mut resolved = std::collections::BTreeMap::new();
1734    for role_spec in &resource.spec.roles {
1735        if let Some(pw) = &role_spec.password
1736            && pw.secret_ref.is_some()
1737        {
1738            let password = resolve_password_from_cache(&role_spec.name, pw, secret_cache)?;
1739            resolved.insert(role_spec.name.clone(), password);
1740        }
1741    }
1742    Ok(resolved)
1743}
1744
1745fn select_password_changes(
1746    changes: &[pgroles_core::diff::Change],
1747    resolved_passwords: &std::collections::BTreeMap<String, ResolvedPassword>,
1748    status: Option<&PostgresPolicyStatus>,
1749) -> (
1750    std::collections::BTreeMap<String, String>,
1751    std::collections::BTreeMap<String, String>,
1752) {
1753    let created_roles: std::collections::BTreeSet<&str> = changes
1754        .iter()
1755        .filter_map(|change| match change {
1756            pgroles_core::diff::Change::CreateRole { name, .. } => Some(name.as_str()),
1757            _ => None,
1758        })
1759        .collect();
1760    let previous_versions = status
1761        .map(|status| &status.applied_password_source_versions)
1762        .cloned()
1763        .unwrap_or_default();
1764
1765    let mut password_changes = std::collections::BTreeMap::new();
1766    let mut current_versions = std::collections::BTreeMap::new();
1767
1768    for (role, resolved) in resolved_passwords {
1769        current_versions.insert(role.clone(), resolved.source_version.clone());
1770        if created_roles.contains(role.as_str())
1771            || previous_versions.get(role) != Some(&resolved.source_version)
1772        {
1773            password_changes.insert(role.clone(), resolved.cleartext.clone());
1774        }
1775    }
1776
1777    (password_changes, current_versions)
1778}
1779
1780/// Cleanup on deletion — evict cached pool.
1781async fn reconcile_cleanup(
1782    resource: &PostgresPolicy,
1783    ctx: &OperatorContext,
1784) -> Result<Action, ReconcileError> {
1785    let name = resource.name_any();
1786    let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
1787
1788    info!(name, namespace, "cleaning up (resource deleted)");
1789
1790    // Evict any cached pool for this resource's connection.
1791    ctx.evict_pool(&namespace, &resource.spec.connection).await;
1792
1793    // Note: we do NOT revoke grants on deletion. The resource being deleted
1794    // means the user no longer wants pgroles to manage these roles — it does
1795    // NOT mean "revoke everything". This is the safe default.
1796
1797    Ok(Action::await_change())
1798}
1799
1800/// Accumulate change counts into the summary.
1801fn accumulate_summary(summary: &mut ChangeSummary, change: &pgroles_core::diff::Change) {
1802    use pgroles_core::diff::Change;
1803    match change {
1804        Change::CreateRole { .. } => summary.roles_created += 1,
1805        Change::CreateSchema { .. } => summary.schemas_created += 1,
1806        Change::AlterSchemaOwner { .. } => summary.schema_owners_altered += 1,
1807        Change::AlterRole { .. } => summary.roles_altered += 1,
1808        Change::SetComment { .. } => summary.roles_altered += 1,
1809        Change::DropRole { .. } => summary.roles_dropped += 1,
1810        Change::TerminateSessions { .. } => summary.sessions_terminated += 1,
1811        Change::ReassignOwned { .. } => {}
1812        Change::DropOwned { .. } => {}
1813        Change::Grant { .. } | Change::EnsureSchemaOwnerPrivileges { .. } => {
1814            summary.grants_added += 1
1815        }
1816        Change::Revoke { .. } => summary.grants_revoked += 1,
1817        Change::SetDefaultPrivilege { .. } => summary.default_privileges_set += 1,
1818        Change::RevokeDefaultPrivilege { .. } => summary.default_privileges_revoked += 1,
1819        Change::AddMember { .. } => summary.members_added += 1,
1820        Change::RemoveMember { .. } => summary.members_removed += 1,
1821        Change::SetPassword { .. } => summary.passwords_set += 1,
1822    }
1823}
1824
1825fn summarize_changes(changes: &[pgroles_core::diff::Change]) -> ChangeSummary {
1826    let mut summary = ChangeSummary::default();
1827    for change in changes {
1828        accumulate_summary(&mut summary, change);
1829    }
1830    summary.total = summary.roles_created
1831        + summary.roles_altered
1832        + summary.schemas_created
1833        + summary.schema_owners_altered
1834        + summary.roles_dropped
1835        + summary.sessions_terminated
1836        + summary.grants_added
1837        + summary.grants_revoked
1838        + summary.default_privileges_set
1839        + summary.default_privileges_revoked
1840        + summary.members_added
1841        + summary.members_removed
1842        + summary.passwords_set;
1843    summary
1844}
1845
1846/// Parse a simplified RFC 3339 / ISO 8601 timestamp (`YYYY-MM-DDTHH:MM:SSZ`)
1847/// into seconds since the Unix epoch.
1848///
1849/// Returns `None` if the string does not match the expected format.
1850fn parse_rfc3339_to_epoch_secs(timestamp: &str) -> Option<u64> {
1851    // Expected format: "2026-03-31T12:34:56Z"
1852    if timestamp.len() < 20 || !timestamp.ends_with('Z') {
1853        return None;
1854    }
1855    let year: u64 = timestamp.get(0..4)?.parse().ok()?;
1856    let month: u64 = timestamp.get(5..7)?.parse().ok()?;
1857    let day: u64 = timestamp.get(8..10)?.parse().ok()?;
1858    let hours: u64 = timestamp.get(11..13)?.parse().ok()?;
1859    let minutes: u64 = timestamp.get(14..16)?.parse().ok()?;
1860    let seconds: u64 = timestamp.get(17..19)?.parse().ok()?;
1861
1862    // Convert to days since epoch using the inverse of the civil algorithm.
1863    let (y, m) = if month <= 2 {
1864        (year - 1, month + 9)
1865    } else {
1866        (year, month - 3)
1867    };
1868    let era = y / 400;
1869    let yoe = y - era * 400;
1870    let doy = (153 * m + 2) / 5 + day - 1;
1871    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1872    let days_since_epoch = era * 146097 + doe - 719468;
1873
1874    Some(days_since_epoch * 86400 + hours * 3600 + minutes * 60 + seconds)
1875}
1876
1877async fn detect_sql_context(
1878    pool: &sqlx::PgPool,
1879    inspect_config: &pgroles_inspect::InspectConfig,
1880) -> Result<pgroles_core::sql::SqlContext, ReconcileError> {
1881    let pg_version = pgroles_inspect::detect_pg_version(pool).await?;
1882    let privilege_schemas: Vec<&str> = inspect_config
1883        .privilege_schemas
1884        .iter()
1885        .map(|schema| schema.as_str())
1886        .collect();
1887    let relation_inventory =
1888        pgroles_inspect::fetch_relation_inventory(pool, &privilege_schemas).await?;
1889    Ok(
1890        pgroles_core::sql::SqlContext::from_version_num(pg_version.version_num)
1891            .with_relation_inventory(relation_inventory),
1892    )
1893}
1894
1895fn render_plan_sql_for_status(
1896    changes: &[pgroles_core::diff::Change],
1897    sql_ctx: &pgroles_core::sql::SqlContext,
1898) -> (Option<String>, bool) {
1899    if changes.is_empty() {
1900        return (None, false);
1901    }
1902
1903    // Render each change individually so we can redact passwords.
1904    let rendered: String = changes
1905        .iter()
1906        .flat_map(|change| {
1907            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1908                vec![format!(
1909                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
1910                    pgroles_core::sql::quote_ident(name)
1911                )]
1912            } else {
1913                pgroles_core::sql::render_statements_with_context(change, sql_ctx)
1914            }
1915        })
1916        .collect::<Vec<_>>()
1917        .join("\n");
1918
1919    let (truncated, did_truncate) = truncate_status_text(&rendered, MAX_PLANNED_SQL_STATUS_BYTES);
1920    (Some(truncated), did_truncate)
1921}
1922
1923fn truncate_status_text(text: &str, max_bytes: usize) -> (String, bool) {
1924    if text.len() <= max_bytes {
1925        return (text.to_string(), false);
1926    }
1927
1928    let marker = "\n-- truncated for status --";
1929    let target_len = max_bytes.saturating_sub(marker.len());
1930    let mut end = target_len.min(text.len());
1931    while end > 0 && !text.is_char_boundary(end) {
1932        end -= 1;
1933    }
1934
1935    let mut truncated = text[..end].to_string();
1936    truncated.push_str(marker);
1937    (truncated, true)
1938}
1939
1940/// Emit a plan lifecycle event on the parent policy, logging warnings on failure.
1941async fn emit_plan_event(
1942    ctx: &OperatorContext,
1943    policy: &PostgresPolicy,
1944    plan: &PostgresPolicyPlan,
1945    event_type: PlanEventType,
1946) {
1947    if let Err(error) = publish_plan_event(&ctx.event_recorder, policy, plan, event_type).await {
1948        let namespace = policy.namespace().unwrap_or_default();
1949        let name = policy.name_any();
1950        tracing::warn!(
1951            policy = %format!("{namespace}/{name}"),
1952            %error,
1953            "failed to publish plan lifecycle event"
1954        );
1955    }
1956}
1957
1958/// Patch the status sub-resource of a PostgresPolicy.
1959async fn update_status<F>(
1960    ctx: &OperatorContext,
1961    resource: &PostgresPolicy,
1962    mutate: F,
1963) -> Result<(), ReconcileError>
1964where
1965    F: FnOnce(&mut PostgresPolicyStatus),
1966{
1967    let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
1968    let name = resource.name_any();
1969
1970    let api: Api<PostgresPolicy> = Api::namespaced(ctx.kube_client.clone(), &namespace);
1971    let latest = api.get(&name).await?;
1972    let old_status = latest.status.clone();
1973    let mut status = old_status.clone().unwrap_or_default();
1974
1975    mutate(&mut status);
1976
1977    let patch = serde_json::json!({
1978        "status": status
1979    });
1980
1981    api.patch_status(
1982        &name,
1983        &PatchParams::apply("pgroles-operator"),
1984        &Patch::Merge(&patch),
1985    )
1986    .await?;
1987
1988    if let Err(error) =
1989        publish_status_events(&ctx.event_recorder, &latest, old_status.as_ref(), &status).await
1990    {
1991        tracing::warn!(policy = %format!("{namespace}/{name}"), %error, "failed to publish Kubernetes Events");
1992    }
1993
1994    Ok(())
1995}
1996
1997async fn detect_policy_conflict(
1998    ctx: &OperatorContext,
1999    resource: &PostgresPolicy,
2000    identity: &DatabaseIdentity,
2001    ownership: &crate::crd::OwnershipClaims,
2002) -> Result<Option<String>, ReconcileError> {
2003    let api: Api<PostgresPolicy> = Api::all(ctx.kube_client.clone());
2004    let policies = api.list(&Default::default()).await?;
2005
2006    Ok(detect_policy_conflict_in_list(
2007        resource,
2008        identity,
2009        ownership,
2010        policies.into_iter(),
2011    ))
2012}
2013
2014fn detect_policy_conflict_in_list(
2015    resource: &PostgresPolicy,
2016    identity: &DatabaseIdentity,
2017    ownership: &crate::crd::OwnershipClaims,
2018    policies: impl IntoIterator<Item = PostgresPolicy>,
2019) -> Option<String> {
2020    let this_ns = resource.namespace()?;
2021    let this_name = resource.name_any();
2022
2023    let mut conflicts = Vec::new();
2024    for other in policies {
2025        let other_ns = match other.namespace() {
2026            Some(ns) => ns,
2027            None => continue,
2028        };
2029        let other_name = other.name_any();
2030        if other_ns == this_ns && other_name == this_name {
2031            continue;
2032        }
2033
2034        let other_identity = DatabaseIdentity::from_connection(&other_ns, &other.spec.connection);
2035        if &other_identity != identity {
2036            continue;
2037        }
2038
2039        if let Err(error) = other.spec.validate_password_specs(&other_name) {
2040            tracing::warn!(
2041                policy = %format!("{other_ns}/{other_name}"),
2042                database = %identity.as_str(),
2043                %error,
2044                "skipping conflict detection for invalid peer policy"
2045            );
2046            continue;
2047        }
2048
2049        let other_ownership = match other.spec.ownership_claims() {
2050            Ok(claims) => claims,
2051            Err(error) => {
2052                tracing::warn!(
2053                    policy = %format!("{other_ns}/{other_name}"),
2054                    database = %identity.as_str(),
2055                    %error,
2056                    "skipping conflict detection for invalid peer policy"
2057                );
2058                continue;
2059            }
2060        };
2061        if ownership.overlaps(&other_ownership) {
2062            let overlap = ownership.overlap_summary(&other_ownership);
2063            conflicts.push(format!("{other_ns}/{other_name} ({overlap})"));
2064        }
2065    }
2066
2067    if conflicts.is_empty() {
2068        None
2069    } else {
2070        Some(format!(
2071            "policy ownership overlaps with {} on database target {}",
2072            conflicts.join(", "),
2073            identity.as_str()
2074        ))
2075    }
2076}
2077
2078impl ReconcileError {
2079    fn reason(&self) -> &'static str {
2080        match self {
2081            ReconcileError::ManifestExpansion(_)
2082            | ReconcileError::InvalidInterval(_, _)
2083            | ReconcileError::InvalidSpec(_) => "InvalidSpec",
2084            ReconcileError::ConflictingPolicy(_) => "ConflictingPolicy",
2085            ReconcileError::UnsatisfiableWildcardGrant(_) => "UnsatisfiableWildcardGrant",
2086            ReconcileError::LockContention(_, _) => "LockContention",
2087            ReconcileError::Context(context) => match context.as_ref() {
2088                ContextError::SecretFetch { .. } => "SecretFetchFailed",
2089                ContextError::SecretMissing { .. } => "SecretMissing",
2090                ContextError::DatabaseConnect { .. } => "DatabaseConnectionFailed",
2091                ContextError::EmptyResolvedValue { .. } => "InvalidConnectionParams",
2092                ContextError::InvalidResolvedSslMode { .. } => "InvalidConnectionParams",
2093            },
2094            ReconcileError::Inspect(error) => match error {
2095                pgroles_inspect::InspectError::Database(sql_err) => {
2096                    match classify_sqlx_error(sql_err) {
2097                        SqlErrorKind::InsufficientPrivileges => "InsufficientPrivileges",
2098                        SqlErrorKind::MissingDatabaseObject => "MissingDatabaseObject",
2099                        SqlErrorKind::Transient => "DatabaseInspectionFailed",
2100                    }
2101                }
2102            },
2103            ReconcileError::SqlExec(error) => match classify_sqlx_error(error) {
2104                SqlErrorKind::InsufficientPrivileges => "InsufficientPrivileges",
2105                SqlErrorKind::MissingDatabaseObject => "MissingDatabaseObject",
2106                SqlErrorKind::Transient => "ApplyFailed",
2107            },
2108            ReconcileError::UnsafeRoleDrops(_) => "UnsafeRoleDrops",
2109            ReconcileError::EmptyPasswordSecret { .. } => "InvalidSpec",
2110            ReconcileError::MissingDatabaseObjects(_) => "MissingDatabaseObject",
2111            ReconcileError::PasswordGeneration(_) => "SecretFetchFailed",
2112            ReconcileError::PlanSqlStorage(_) => "PlanSqlStorageFailed",
2113            ReconcileError::Kube(_) => "KubernetesApiError",
2114            ReconcileError::NoNamespace => "InvalidResource",
2115        }
2116    }
2117}
2118
2119// ---------------------------------------------------------------------------
2120// Tests
2121// ---------------------------------------------------------------------------
2122
2123#[cfg(test)]
2124mod tests {
2125    use super::*;
2126    use crate::crd::{
2127        ConnectionSpec, CrdReconciliationMode, PasswordSpec, PolicyMode, PostgresPolicySpec,
2128        RoleSpec, SecretReference,
2129    };
2130    use k8s_openapi::{
2131        ByteString, api::core::v1::Secret, apimachinery::pkg::apis::meta::v1::ObjectMeta,
2132    };
2133    use sqlx::error::{DatabaseError, ErrorKind};
2134    use std::borrow::Cow;
2135    use std::collections::BTreeMap;
2136    use std::error::Error as StdError;
2137    use std::fmt;
2138
2139    #[derive(Debug)]
2140    struct TestDatabaseError {
2141        message: String,
2142        code: Option<&'static str>,
2143    }
2144
2145    impl fmt::Display for TestDatabaseError {
2146        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2147            f.write_str(&self.message)
2148        }
2149    }
2150
2151    impl StdError for TestDatabaseError {}
2152
2153    impl DatabaseError for TestDatabaseError {
2154        fn message(&self) -> &str {
2155            &self.message
2156        }
2157
2158        fn code(&self) -> Option<Cow<'_, str>> {
2159            self.code.map(Cow::Borrowed)
2160        }
2161
2162        fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static) {
2163            self
2164        }
2165
2166        fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
2167            self
2168        }
2169
2170        fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
2171            self
2172        }
2173
2174        fn kind(&self) -> ErrorKind {
2175            ErrorKind::Other
2176        }
2177    }
2178
2179    fn insufficient_privilege_sqlx_error() -> sqlx::Error {
2180        sqlx::Error::Database(Box::new(TestDatabaseError {
2181            message: "permission denied to create role".to_string(),
2182            code: Some(SQLSTATE_INSUFFICIENT_PRIVILEGE),
2183        }))
2184    }
2185
2186    fn missing_schema_sqlx_error() -> sqlx::Error {
2187        sqlx::Error::Database(Box::new(TestDatabaseError {
2188            message: "schema \"etl\" does not exist".to_string(),
2189            code: Some(SQLSTATE_INVALID_SCHEMA_NAME),
2190        }))
2191    }
2192
2193    fn missing_table_sqlx_error() -> sqlx::Error {
2194        sqlx::Error::Database(Box::new(TestDatabaseError {
2195            message: "relation \"foo\" does not exist".to_string(),
2196            code: Some(SQLSTATE_UNDEFINED_TABLE),
2197        }))
2198    }
2199
2200    fn missing_function_sqlx_error() -> sqlx::Error {
2201        sqlx::Error::Database(Box::new(TestDatabaseError {
2202            message: "function foo() does not exist".to_string(),
2203            code: Some(SQLSTATE_UNDEFINED_FUNCTION),
2204        }))
2205    }
2206
2207    fn missing_object_sqlx_error() -> sqlx::Error {
2208        sqlx::Error::Database(Box::new(TestDatabaseError {
2209            message: "role \"nope\" does not exist".to_string(),
2210            code: Some(SQLSTATE_UNDEFINED_OBJECT),
2211        }))
2212    }
2213
2214    fn transient_sqlx_error() -> sqlx::Error {
2215        sqlx::Error::Database(Box::new(TestDatabaseError {
2216            message: "connection timed out".to_string(),
2217            code: Some("08006"),
2218        }))
2219    }
2220
2221    fn test_policy(interval: &str, transient_failure_count: i32) -> Arc<PostgresPolicy> {
2222        let spec = PostgresPolicySpec {
2223            connection: ConnectionSpec {
2224                secret_ref: Some(SecretReference {
2225                    name: "db-credentials".to_string(),
2226                }),
2227                secret_key: Some("DATABASE_URL".to_string()),
2228                params: None,
2229            },
2230            interval: interval.to_string(),
2231            suspend: false,
2232            mode: PolicyMode::Apply,
2233            reconciliation_mode: CrdReconciliationMode::default(),
2234            default_owner: None,
2235            profiles: Default::default(),
2236            schemas: Vec::new(),
2237            roles: Vec::new(),
2238            grants: Vec::new(),
2239            default_privileges: Vec::new(),
2240            memberships: Vec::new(),
2241            retirements: Vec::new(),
2242            approval: None,
2243        };
2244        let mut resource = PostgresPolicy::new("example", spec);
2245        resource.metadata.namespace = Some("default".to_string());
2246        resource.status = Some(PostgresPolicyStatus {
2247            transient_failure_count,
2248            ..Default::default()
2249        });
2250        Arc::new(resource)
2251    }
2252
2253    fn test_policy_with_spec(name: &str, spec: PostgresPolicySpec) -> PostgresPolicy {
2254        let mut resource = PostgresPolicy::new(name, spec);
2255        resource.metadata.namespace = Some("default".to_string());
2256        resource
2257    }
2258
2259    fn valid_role_policy(name: &str, role_name: &str, secret_name: &str) -> PostgresPolicy {
2260        test_policy_with_spec(
2261            name,
2262            PostgresPolicySpec {
2263                connection: ConnectionSpec {
2264                    secret_ref: Some(SecretReference {
2265                        name: secret_name.to_string(),
2266                    }),
2267                    secret_key: Some("DATABASE_URL".to_string()),
2268                    params: None,
2269                },
2270                interval: "5m".to_string(),
2271                suspend: false,
2272                mode: PolicyMode::Apply,
2273                reconciliation_mode: CrdReconciliationMode::default(),
2274                default_owner: None,
2275                profiles: Default::default(),
2276                schemas: Vec::new(),
2277                roles: vec![RoleSpec {
2278                    name: role_name.to_string(),
2279                    login: Some(true),
2280                    superuser: None,
2281                    createdb: None,
2282                    createrole: None,
2283                    inherit: None,
2284                    replication: None,
2285                    bypassrls: None,
2286                    connection_limit: None,
2287                    comment: None,
2288                    password: None,
2289                    password_valid_until: None,
2290                }],
2291                grants: Vec::new(),
2292                default_privileges: Vec::new(),
2293                memberships: Vec::new(),
2294                retirements: Vec::new(),
2295                approval: None,
2296            },
2297        )
2298    }
2299
2300    fn invalid_profile_policy(name: &str, secret_name: &str) -> PostgresPolicy {
2301        test_policy_with_spec(
2302            name,
2303            PostgresPolicySpec {
2304                connection: ConnectionSpec {
2305                    secret_ref: Some(SecretReference {
2306                        name: secret_name.to_string(),
2307                    }),
2308                    secret_key: Some("DATABASE_URL".to_string()),
2309                    params: None,
2310                },
2311                interval: "5m".to_string(),
2312                suspend: false,
2313                mode: PolicyMode::Apply,
2314                reconciliation_mode: CrdReconciliationMode::default(),
2315                default_owner: None,
2316                profiles: Default::default(),
2317                schemas: vec![pgroles_core::manifest::SchemaBinding {
2318                    name: "reporting".to_string(),
2319                    profiles: vec!["missing-profile".to_string()],
2320                    role_pattern: "{schema}-{profile}".to_string(),
2321                    owner: None,
2322                }],
2323                roles: Vec::new(),
2324                grants: Vec::new(),
2325                default_privileges: Vec::new(),
2326                memberships: Vec::new(),
2327                retirements: Vec::new(),
2328                approval: None,
2329            },
2330        )
2331    }
2332
2333    fn password_role_policy() -> PostgresPolicy {
2334        test_policy_with_spec(
2335            "password-policy",
2336            PostgresPolicySpec {
2337                connection: ConnectionSpec {
2338                    secret_ref: Some(SecretReference {
2339                        name: "db-credentials".to_string(),
2340                    }),
2341                    secret_key: Some("DATABASE_URL".to_string()),
2342                    params: None,
2343                },
2344                interval: "5m".to_string(),
2345                suspend: false,
2346                mode: PolicyMode::Apply,
2347                reconciliation_mode: CrdReconciliationMode::default(),
2348                default_owner: None,
2349                profiles: Default::default(),
2350                schemas: Vec::new(),
2351                roles: vec![
2352                    RoleSpec {
2353                        name: "app".to_string(),
2354                        login: Some(true),
2355                        superuser: None,
2356                        createdb: None,
2357                        createrole: None,
2358                        inherit: None,
2359                        replication: None,
2360                        bypassrls: None,
2361                        connection_limit: None,
2362                        comment: None,
2363                        password: Some(PasswordSpec {
2364                            secret_ref: Some(SecretReference {
2365                                name: "role-passwords".to_string(),
2366                            }),
2367                            secret_key: None,
2368                            generate: None,
2369                        }),
2370                        password_valid_until: None,
2371                    },
2372                    RoleSpec {
2373                        name: "reporter".to_string(),
2374                        login: Some(true),
2375                        superuser: None,
2376                        createdb: None,
2377                        createrole: None,
2378                        inherit: None,
2379                        replication: None,
2380                        bypassrls: None,
2381                        connection_limit: None,
2382                        comment: None,
2383                        password: Some(PasswordSpec {
2384                            secret_ref: Some(SecretReference {
2385                                name: "role-passwords".to_string(),
2386                            }),
2387                            secret_key: Some("reporter-password".to_string()),
2388                            generate: None,
2389                        }),
2390                        password_valid_until: None,
2391                    },
2392                ],
2393                grants: Vec::new(),
2394                default_privileges: Vec::new(),
2395                memberships: Vec::new(),
2396                retirements: Vec::new(),
2397                approval: None,
2398            },
2399        )
2400    }
2401
2402    fn secret_with_keys(name: &str, entries: &[(&str, &str)]) -> Secret {
2403        secret_with_keys_and_version(name, "1", entries)
2404    }
2405
2406    fn secret_with_keys_and_version(
2407        name: &str,
2408        resource_version: &str,
2409        entries: &[(&str, &str)],
2410    ) -> Secret {
2411        Secret {
2412            metadata: ObjectMeta {
2413                name: Some(name.to_string()),
2414                resource_version: Some(resource_version.to_string()),
2415                ..Default::default()
2416            },
2417            data: Some(
2418                entries
2419                    .iter()
2420                    .map(|(key, value)| ((*key).to_string(), ByteString(value.as_bytes().to_vec())))
2421                    .collect(),
2422            ),
2423            ..Default::default()
2424        }
2425    }
2426
2427    #[test]
2428    fn parse_interval_minutes() {
2429        let d = parse_interval("5m").unwrap();
2430        assert_eq!(d, Duration::from_secs(300));
2431    }
2432
2433    #[test]
2434    fn parse_interval_hours() {
2435        let d = parse_interval("1h").unwrap();
2436        assert_eq!(d, Duration::from_secs(3600));
2437    }
2438
2439    #[test]
2440    fn parse_interval_seconds() {
2441        let d = parse_interval("30s").unwrap();
2442        assert_eq!(d, Duration::from_secs(30));
2443    }
2444
2445    #[test]
2446    fn parse_interval_compound() {
2447        let d = parse_interval("1h30m").unwrap();
2448        assert_eq!(d, Duration::from_secs(5400));
2449    }
2450
2451    #[test]
2452    fn parse_interval_empty_uses_default() {
2453        let d = parse_interval("").unwrap();
2454        assert_eq!(d, Duration::from_secs(DEFAULT_REQUEUE_SECS));
2455    }
2456
2457    #[test]
2458    fn parse_interval_bare_number_treated_as_seconds() {
2459        let d = parse_interval("120").unwrap();
2460        assert_eq!(d, Duration::from_secs(120));
2461    }
2462
2463    #[test]
2464    fn parse_interval_invalid_unit() {
2465        let result = parse_interval("5x");
2466        assert!(result.is_err());
2467    }
2468
2469    #[test]
2470    fn accumulate_summary_counts() {
2471        use pgroles_core::diff::Change;
2472        use pgroles_core::model::RoleState;
2473
2474        let mut summary = ChangeSummary::default();
2475
2476        accumulate_summary(
2477            &mut summary,
2478            &Change::CreateRole {
2479                name: "test".to_string(),
2480                state: RoleState {
2481                    login: true,
2482                    ..RoleState::default()
2483                },
2484            },
2485        );
2486        accumulate_summary(
2487            &mut summary,
2488            &Change::Grant {
2489                role: "test".to_string(),
2490                object_type: pgroles_core::manifest::ObjectType::Schema,
2491                schema: None,
2492                name: Some("public".to_string()),
2493                privileges: [pgroles_core::manifest::Privilege::Usage]
2494                    .into_iter()
2495                    .collect(),
2496            },
2497        );
2498        accumulate_summary(
2499            &mut summary,
2500            &Change::TerminateSessions {
2501                role: "test".to_string(),
2502            },
2503        );
2504
2505        assert_eq!(summary.roles_created, 1);
2506        assert_eq!(summary.grants_added, 1);
2507        assert_eq!(summary.sessions_terminated, 1);
2508    }
2509
2510    #[test]
2511    fn accumulate_summary_counts_schema_changes_separately() {
2512        use pgroles_core::diff::Change;
2513
2514        let mut summary = ChangeSummary::default();
2515
2516        accumulate_summary(
2517            &mut summary,
2518            &Change::CreateSchema {
2519                name: "inventory".to_string(),
2520                owner: Some("inventory_owner".to_string()),
2521            },
2522        );
2523        accumulate_summary(
2524            &mut summary,
2525            &Change::AlterSchemaOwner {
2526                name: "catalog".to_string(),
2527                owner: "catalog_owner".to_string(),
2528            },
2529        );
2530
2531        assert_eq!(summary.schemas_created, 1);
2532        assert_eq!(summary.schema_owners_altered, 1);
2533        assert_eq!(summary.grants_added, 0);
2534    }
2535
2536    #[test]
2537    fn summarize_changes_sets_total() {
2538        use pgroles_core::diff::Change;
2539        use pgroles_core::model::RoleState;
2540
2541        let changes = vec![
2542            Change::CreateRole {
2543                name: "test".to_string(),
2544                state: RoleState::default(),
2545            },
2546            Change::CreateSchema {
2547                name: "inventory".to_string(),
2548                owner: Some("inventory_owner".to_string()),
2549            },
2550            Change::Grant {
2551                role: "test".to_string(),
2552                object_type: pgroles_core::manifest::ObjectType::Schema,
2553                schema: None,
2554                name: Some("public".to_string()),
2555                privileges: [pgroles_core::manifest::Privilege::Usage]
2556                    .into_iter()
2557                    .collect(),
2558            },
2559        ];
2560
2561        let summary = summarize_changes(&changes);
2562        assert_eq!(summary.roles_created, 1);
2563        assert_eq!(summary.schemas_created, 1);
2564        assert_eq!(summary.grants_added, 1);
2565        assert_eq!(summary.total, 3);
2566    }
2567
2568    #[test]
2569    fn truncate_status_text_marks_truncation() {
2570        let text = "x".repeat(MAX_PLANNED_SQL_STATUS_BYTES + 32);
2571        let (truncated, did_truncate) = truncate_status_text(&text, MAX_PLANNED_SQL_STATUS_BYTES);
2572        assert!(did_truncate);
2573        assert!(truncated.len() <= MAX_PLANNED_SQL_STATUS_BYTES);
2574        assert!(truncated.ends_with("-- truncated for status --"));
2575    }
2576
2577    #[test]
2578    fn accumulate_summary_all_change_types() {
2579        use pgroles_core::diff::Change;
2580        use pgroles_core::model::RoleState;
2581
2582        let mut summary = ChangeSummary::default();
2583
2584        accumulate_summary(
2585            &mut summary,
2586            &Change::CreateRole {
2587                name: "r1".to_string(),
2588                state: RoleState::default(),
2589            },
2590        );
2591        accumulate_summary(
2592            &mut summary,
2593            &Change::AlterRole {
2594                name: "r1".to_string(),
2595                attributes: vec![pgroles_core::model::RoleAttribute::Login(true)],
2596            },
2597        );
2598        accumulate_summary(
2599            &mut summary,
2600            &Change::CreateSchema {
2601                name: "schema1".to_string(),
2602                owner: Some("owner1".to_string()),
2603            },
2604        );
2605        accumulate_summary(
2606            &mut summary,
2607            &Change::AlterSchemaOwner {
2608                name: "schema2".to_string(),
2609                owner: "owner2".to_string(),
2610            },
2611        );
2612        accumulate_summary(
2613            &mut summary,
2614            &Change::SetComment {
2615                name: "r1".to_string(),
2616                comment: Some("comment".to_string()),
2617            },
2618        );
2619        accumulate_summary(
2620            &mut summary,
2621            &Change::DropRole {
2622                name: "r1".to_string(),
2623            },
2624        );
2625        accumulate_summary(
2626            &mut summary,
2627            &Change::TerminateSessions {
2628                role: "r1".to_string(),
2629            },
2630        );
2631        accumulate_summary(
2632            &mut summary,
2633            &Change::ReassignOwned {
2634                from_role: "r1".to_string(),
2635                to_role: "r2".to_string(),
2636            },
2637        );
2638        accumulate_summary(
2639            &mut summary,
2640            &Change::DropOwned {
2641                role: "r1".to_string(),
2642            },
2643        );
2644        accumulate_summary(
2645            &mut summary,
2646            &Change::Grant {
2647                role: "r1".to_string(),
2648                object_type: pgroles_core::manifest::ObjectType::Table,
2649                schema: Some("public".to_string()),
2650                name: Some("*".to_string()),
2651                privileges: [pgroles_core::manifest::Privilege::Select]
2652                    .into_iter()
2653                    .collect(),
2654            },
2655        );
2656        accumulate_summary(
2657            &mut summary,
2658            &Change::Revoke {
2659                role: "r1".to_string(),
2660                object_type: pgroles_core::manifest::ObjectType::Table,
2661                schema: Some("public".to_string()),
2662                name: Some("*".to_string()),
2663                privileges: [pgroles_core::manifest::Privilege::Select]
2664                    .into_iter()
2665                    .collect(),
2666            },
2667        );
2668        accumulate_summary(
2669            &mut summary,
2670            &Change::SetDefaultPrivilege {
2671                schema: "public".to_string(),
2672                owner: "owner".to_string(),
2673                grantee: "r1".to_string(),
2674                on_type: pgroles_core::manifest::ObjectType::Table,
2675                privileges: [pgroles_core::manifest::Privilege::Select]
2676                    .into_iter()
2677                    .collect(),
2678            },
2679        );
2680        accumulate_summary(
2681            &mut summary,
2682            &Change::RevokeDefaultPrivilege {
2683                schema: "public".to_string(),
2684                owner: "owner".to_string(),
2685                grantee: "r1".to_string(),
2686                on_type: pgroles_core::manifest::ObjectType::Table,
2687                privileges: [pgroles_core::manifest::Privilege::Select]
2688                    .into_iter()
2689                    .collect(),
2690            },
2691        );
2692        accumulate_summary(
2693            &mut summary,
2694            &Change::AddMember {
2695                role: "r1".to_string(),
2696                member: "r2".to_string(),
2697                inherit: true,
2698                admin: false,
2699            },
2700        );
2701        accumulate_summary(
2702            &mut summary,
2703            &Change::RemoveMember {
2704                role: "r1".to_string(),
2705                member: "r2".to_string(),
2706            },
2707        );
2708
2709        assert_eq!(summary.roles_created, 1);
2710        // AlterRole + SetComment both increment roles_altered
2711        assert_eq!(summary.roles_altered, 2);
2712        assert_eq!(summary.schemas_created, 1);
2713        assert_eq!(summary.schema_owners_altered, 1);
2714        assert_eq!(summary.roles_dropped, 1);
2715        assert_eq!(summary.sessions_terminated, 1);
2716        assert_eq!(summary.grants_added, 1);
2717        assert_eq!(summary.grants_revoked, 1);
2718        assert_eq!(summary.default_privileges_set, 1);
2719        assert_eq!(summary.default_privileges_revoked, 1);
2720        assert_eq!(summary.members_added, 1);
2721        assert_eq!(summary.members_removed, 1);
2722    }
2723
2724    #[test]
2725    fn error_reason_invalid_spec_for_manifest_expansion() {
2726        let err = ReconcileError::ManifestExpansion(
2727            pgroles_core::manifest::ManifestError::UndefinedProfile("bad".into(), "schema1".into()),
2728        );
2729        assert_eq!(err.reason(), "InvalidSpec");
2730    }
2731
2732    #[test]
2733    fn error_reason_invalid_spec_for_invalid_interval() {
2734        let err = ReconcileError::InvalidInterval("5x".into(), "unknown unit 'x'".into());
2735        assert_eq!(err.reason(), "InvalidSpec");
2736    }
2737
2738    #[test]
2739    fn error_reason_invalid_spec_for_password_validation() {
2740        let err = ReconcileError::InvalidSpec("role password must set exactly one mode".into());
2741        assert_eq!(err.reason(), "InvalidSpec");
2742    }
2743
2744    #[test]
2745    fn error_reason_missing_database_objects() {
2746        let err = ReconcileError::MissingDatabaseObjects("schema \"etl\"".into());
2747        assert_eq!(err.reason(), "MissingDatabaseObject");
2748    }
2749
2750    #[test]
2751    fn error_reason_unsatisfiable_wildcard_grant() {
2752        let err = ReconcileError::UnsatisfiableWildcardGrant(
2753            "UnsatisfiableWildcardGrant: function f2() is not grantable".into(),
2754        );
2755        assert_eq!(err.reason(), "UnsatisfiableWildcardGrant");
2756        assert!(err.to_string().contains("UnsatisfiableWildcardGrant"));
2757    }
2758
2759    #[test]
2760    fn unsatisfiable_wildcard_status_is_degraded_without_plan_reference() {
2761        let message = "UnsatisfiableWildcardGrant: cannot fully satisfy wildcard grant EXECUTE ON function * IN SCHEMA \"app\" TO \"reader\" as executor \"app_owner\"; 1 matching object(s) are missing the desired privilege and are not grantable (examples: \"f2()\" owned by \"definer\" missing [EXECUTE])";
2762        let mut status = PostgresPolicyStatus {
2763            conditions: vec![
2764                ready_condition(true, "Planned", "Plan computed"),
2765                conflict_condition("ConflictingPolicy", "Policy overlaps another policy"),
2766                reconciling_condition("Reconciliation in progress"),
2767                drifted_condition(true, "DriftDetected", "1 planned change pending"),
2768            ],
2769            change_summary: Some(ChangeSummary {
2770                grants_added: 1,
2771                total: 1,
2772                ..Default::default()
2773            }),
2774            planned_sql: Some(
2775                "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA \"app\" TO \"reader\";".into(),
2776            ),
2777            planned_sql_truncated: true,
2778            last_error: None,
2779            transient_failure_count: 3,
2780            current_plan_ref: Some(crate::crd::PlanReference {
2781                name: "example-plan".into(),
2782            }),
2783            ..Default::default()
2784        };
2785
2786        mark_reconcile_failure_status(
2787            &mut status,
2788            "UnsatisfiableWildcardGrant",
2789            message,
2790            false,
2791            true,
2792        );
2793
2794        let ready = status
2795            .conditions
2796            .iter()
2797            .find(|condition| condition.condition_type == "Ready")
2798            .expect("Ready condition should be present");
2799        assert_eq!(ready.status, "False");
2800        assert_eq!(ready.reason.as_deref(), Some("UnsatisfiableWildcardGrant"));
2801        assert_eq!(ready.message.as_deref(), Some(message));
2802
2803        let degraded = status
2804            .conditions
2805            .iter()
2806            .find(|condition| condition.condition_type == "Degraded")
2807            .expect("Degraded condition should be present");
2808        assert_eq!(degraded.status, "True");
2809        assert_eq!(
2810            degraded.reason.as_deref(),
2811            Some("UnsatisfiableWildcardGrant")
2812        );
2813        assert_eq!(degraded.message.as_deref(), Some(message));
2814
2815        assert!(
2816            status.conditions.iter().all(|condition| {
2817                condition.condition_type != "Reconciling"
2818                    && condition.condition_type != "Drifted"
2819                    && condition.condition_type != "Conflict"
2820            }),
2821            "transient planning and stale conflict conditions should be cleared on degraded status"
2822        );
2823        assert!(status.change_summary.is_none());
2824        assert!(status.planned_sql.is_none());
2825        assert!(!status.planned_sql_truncated);
2826        assert!(status.current_plan_ref.is_none());
2827        assert_eq!(status.last_error.as_deref(), Some(message));
2828        assert_eq!(status.transient_failure_count, 0);
2829    }
2830
2831    #[test]
2832    fn reconcile_failure_status_preserves_plan_reference_when_requested() {
2833        let mut status = PostgresPolicyStatus {
2834            current_plan_ref: Some(crate::crd::PlanReference {
2835                name: "approved-plan".into(),
2836            }),
2837            planned_sql: Some("ALTER ROLE \"app\" LOGIN;".into()),
2838            planned_sql_truncated: true,
2839            transient_failure_count: 2,
2840            ..Default::default()
2841        };
2842
2843        mark_reconcile_failure_status(
2844            &mut status,
2845            "ApplyFailed",
2846            "SQL execution error: connection closed",
2847            true,
2848            false,
2849        );
2850
2851        assert_eq!(
2852            status
2853                .current_plan_ref
2854                .as_ref()
2855                .map(|plan| plan.name.as_str()),
2856            Some("approved-plan")
2857        );
2858        assert!(status.planned_sql.is_none());
2859        assert!(!status.planned_sql_truncated);
2860        assert_eq!(
2861            status.last_error.as_deref(),
2862            Some("SQL execution error: connection closed")
2863        );
2864        assert_eq!(status.transient_failure_count, 3);
2865    }
2866
2867    #[test]
2868    fn error_display_missing_database_objects_lists_schemas() {
2869        let err = ReconcileError::MissingDatabaseObjects("schema \"etl\", schema \"jobs\"".into());
2870        let msg = err.to_string();
2871        assert!(msg.contains("schema \"etl\""));
2872        assert!(msg.contains("schema \"jobs\""));
2873        assert!(
2874            msg.contains("pointing at the intended database"),
2875            "message should include remediation hint"
2876        );
2877    }
2878
2879    #[test]
2880    fn referenced_schema_names_from_schema_grants() {
2881        use pgroles_core::manifest::{
2882            ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
2883        };
2884        let expanded = ExpandedManifest {
2885            schemas: Vec::new(),
2886            roles: Vec::new(),
2887            grants: vec![Grant {
2888                role: "app".into(),
2889                privileges: vec![Privilege::Usage],
2890                object: ObjectTarget {
2891                    object_type: ObjectType::Schema,
2892                    schema: None,
2893                    name: Some("etl".into()),
2894                },
2895            }],
2896            default_privileges: Vec::new(),
2897            memberships: Vec::new(),
2898        };
2899        let names = referenced_schema_names(&expanded);
2900        assert!(names.contains("etl"));
2901    }
2902
2903    #[test]
2904    fn referenced_schema_names_from_table_grants() {
2905        use pgroles_core::manifest::{
2906            ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
2907        };
2908        let expanded = ExpandedManifest {
2909            schemas: Vec::new(),
2910            roles: Vec::new(),
2911            grants: vec![Grant {
2912                role: "app".into(),
2913                privileges: vec![Privilege::Select],
2914                object: ObjectTarget {
2915                    object_type: ObjectType::Table,
2916                    schema: Some("analytics".into()),
2917                    name: Some("*".into()),
2918                },
2919            }],
2920            default_privileges: Vec::new(),
2921            memberships: Vec::new(),
2922        };
2923        let names = referenced_schema_names(&expanded);
2924        assert!(names.contains("analytics"));
2925    }
2926
2927    #[test]
2928    fn referenced_schema_names_from_default_privileges() {
2929        use pgroles_core::manifest::{
2930            DefaultPrivilege, DefaultPrivilegeGrant, ExpandedManifest, ObjectType, Privilege,
2931        };
2932        let expanded = ExpandedManifest {
2933            schemas: Vec::new(),
2934            roles: Vec::new(),
2935            grants: Vec::new(),
2936            default_privileges: vec![DefaultPrivilege {
2937                owner: Some("app_owner".into()),
2938                schema: "reporting".into(),
2939                grant: vec![DefaultPrivilegeGrant {
2940                    role: Some("app".into()),
2941                    privileges: vec![Privilege::Select],
2942                    on_type: ObjectType::Table,
2943                }],
2944            }],
2945            memberships: Vec::new(),
2946        };
2947        let names = referenced_schema_names(&expanded);
2948        assert!(names.contains("reporting"));
2949    }
2950
2951    #[test]
2952    fn referenced_schema_names_deduplicates_across_sources() {
2953        use pgroles_core::manifest::{
2954            DefaultPrivilege, DefaultPrivilegeGrant, ExpandedManifest, Grant, ObjectTarget,
2955            ObjectType, Privilege,
2956        };
2957        let expanded = ExpandedManifest {
2958            schemas: Vec::new(),
2959            roles: Vec::new(),
2960            grants: vec![
2961                Grant {
2962                    role: "app".into(),
2963                    privileges: vec![Privilege::Usage],
2964                    object: ObjectTarget {
2965                        object_type: ObjectType::Schema,
2966                        schema: None,
2967                        name: Some("shared".into()),
2968                    },
2969                },
2970                Grant {
2971                    role: "app".into(),
2972                    privileges: vec![Privilege::Select],
2973                    object: ObjectTarget {
2974                        object_type: ObjectType::Table,
2975                        schema: Some("shared".into()),
2976                        name: Some("*".into()),
2977                    },
2978                },
2979            ],
2980            default_privileges: vec![DefaultPrivilege {
2981                owner: Some("app_owner".into()),
2982                schema: "shared".into(),
2983                grant: vec![DefaultPrivilegeGrant {
2984                    role: Some("app".into()),
2985                    privileges: vec![Privilege::Select],
2986                    on_type: ObjectType::Table,
2987                }],
2988            }],
2989            memberships: Vec::new(),
2990        };
2991        let names = referenced_schema_names(&expanded);
2992        // BTreeSet deduplicates so a schema referenced three ways appears once.
2993        assert_eq!(names.len(), 1);
2994        assert!(names.contains("shared"));
2995    }
2996
2997    #[test]
2998    fn referenced_schema_names_skips_database_and_roleless_grants() {
2999        use pgroles_core::manifest::{
3000            ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
3001        };
3002        let expanded = ExpandedManifest {
3003            schemas: Vec::new(),
3004            roles: Vec::new(),
3005            grants: vec![Grant {
3006                role: "app".into(),
3007                privileges: vec![Privilege::Connect],
3008                object: ObjectTarget {
3009                    object_type: ObjectType::Database,
3010                    schema: None,
3011                    name: Some("mydb".into()),
3012                },
3013            }],
3014            default_privileges: Vec::new(),
3015            memberships: Vec::new(),
3016        };
3017        let names = referenced_schema_names(&expanded);
3018        assert!(
3019            names.is_empty(),
3020            "database-level grants should not contribute schema names"
3021        );
3022    }
3023
3024    #[test]
3025    fn is_system_schema_identifies_pg_and_information_schema() {
3026        assert!(is_system_schema("pg_catalog"));
3027        assert!(is_system_schema("pg_toast"));
3028        assert!(is_system_schema("pg_temp_1"));
3029        assert!(is_system_schema("information_schema"));
3030        assert!(!is_system_schema("public"));
3031        assert!(!is_system_schema("etl"));
3032        assert!(!is_system_schema("analytics"));
3033    }
3034
3035    #[test]
3036    fn referenced_schema_names_include_declared_schemas() {
3037        use pgroles_core::manifest::{ExpandedManifest, ExpandedSchema};
3038
3039        let expanded = ExpandedManifest {
3040            schemas: vec![ExpandedSchema {
3041                name: "cdc".into(),
3042                owner: Some("cdc_owner".into()),
3043            }],
3044            roles: Vec::new(),
3045            grants: Vec::new(),
3046            default_privileges: Vec::new(),
3047            memberships: Vec::new(),
3048        };
3049
3050        let names = referenced_schema_names(&expanded);
3051        assert!(names.contains("cdc"));
3052    }
3053
3054    #[test]
3055    fn declared_schema_names_returns_declared_only() {
3056        use pgroles_core::manifest::{ExpandedManifest, ExpandedSchema};
3057
3058        let expanded = ExpandedManifest {
3059            schemas: vec![ExpandedSchema {
3060                name: "cdc".into(),
3061                owner: Some("cdc_owner".into()),
3062            }],
3063            roles: Vec::new(),
3064            grants: Vec::new(),
3065            default_privileges: Vec::new(),
3066            memberships: Vec::new(),
3067        };
3068
3069        let names = declared_schema_names(&expanded);
3070        assert_eq!(names.len(), 1);
3071        assert!(names.contains("cdc"));
3072    }
3073
3074    #[test]
3075    fn externally_required_schema_names_excludes_declared_schemas() {
3076        use pgroles_core::manifest::{
3077            ExpandedManifest, ExpandedSchema, Grant, ObjectTarget, ObjectType, Privilege,
3078        };
3079
3080        let expanded = ExpandedManifest {
3081            schemas: vec![ExpandedSchema {
3082                name: "managed".into(),
3083                owner: Some("managed_owner".into()),
3084            }],
3085            roles: Vec::new(),
3086            grants: vec![
3087                Grant {
3088                    role: "app".into(),
3089                    privileges: vec![Privilege::Usage],
3090                    object: ObjectTarget {
3091                        object_type: ObjectType::Schema,
3092                        schema: None,
3093                        name: Some("managed".into()),
3094                    },
3095                },
3096                Grant {
3097                    role: "app".into(),
3098                    privileges: vec![Privilege::Select],
3099                    object: ObjectTarget {
3100                        object_type: ObjectType::Table,
3101                        schema: Some("external".into()),
3102                        name: Some("*".into()),
3103                    },
3104                },
3105            ],
3106            default_privileges: Vec::new(),
3107            memberships: Vec::new(),
3108        };
3109
3110        let names = externally_required_schema_names(&expanded);
3111        assert_eq!(names.len(), 1);
3112        assert!(names.contains("external"));
3113        assert!(!names.contains("managed"));
3114    }
3115
3116    #[test]
3117    fn error_reason_conflicting_policy() {
3118        let err = ReconcileError::ConflictingPolicy("overlaps with other".into());
3119        assert_eq!(err.reason(), "ConflictingPolicy");
3120    }
3121
3122    #[test]
3123    fn error_reason_unsafe_role_drops() {
3124        let err = ReconcileError::UnsafeRoleDrops("role owns objects".into());
3125        assert_eq!(err.reason(), "UnsafeRoleDrops");
3126    }
3127
3128    #[test]
3129    fn error_reason_no_namespace() {
3130        let err = ReconcileError::NoNamespace;
3131        assert_eq!(err.reason(), "InvalidResource");
3132    }
3133
3134    #[test]
3135    fn error_reason_context_secret_missing() {
3136        let err = ReconcileError::Context(Box::new(crate::context::ContextError::SecretMissing {
3137            name: "pg-secret".into(),
3138            key: "DATABASE_URL".into(),
3139        }));
3140        assert_eq!(err.reason(), "SecretMissing");
3141    }
3142
3143    #[test]
3144    fn error_reason_sql_exec_insufficient_privileges() {
3145        let err = ReconcileError::SqlExec(insufficient_privilege_sqlx_error());
3146        assert_eq!(err.reason(), "InsufficientPrivileges");
3147    }
3148
3149    #[test]
3150    fn error_reason_inspect_insufficient_privileges() {
3151        let err = ReconcileError::Inspect(pgroles_inspect::InspectError::Database(
3152            insufficient_privilege_sqlx_error(),
3153        ));
3154        assert_eq!(err.reason(), "InsufficientPrivileges");
3155    }
3156
3157    #[test]
3158    fn error_display_includes_details() {
3159        let err = ReconcileError::InvalidInterval("5x".into(), "unknown unit 'x'".into());
3160        let msg = err.to_string();
3161        assert!(msg.contains("5x"), "error display should contain interval");
3162        assert!(
3163            msg.contains("unknown unit"),
3164            "error display should contain reason"
3165        );
3166    }
3167
3168    #[test]
3169    fn error_reason_lock_contention() {
3170        let err = ReconcileError::LockContention(
3171            "prod/db-creds/DATABASE_URL".into(),
3172            "in-process lock held".into(),
3173        );
3174        assert_eq!(err.reason(), "LockContention");
3175    }
3176
3177    #[test]
3178    fn error_display_lock_contention_includes_database() {
3179        let err = ReconcileError::LockContention(
3180            "prod/db-creds/DATABASE_URL".into(),
3181            "advisory lock held by another session".into(),
3182        );
3183        let msg = err.to_string();
3184        assert!(
3185            msg.contains("prod/db-creds/DATABASE_URL"),
3186            "lock contention error should include database identity"
3187        );
3188        assert!(
3189            msg.contains("advisory lock"),
3190            "lock contention error should include reason"
3191        );
3192    }
3193
3194    #[test]
3195    fn requeue_with_jitter_produces_bounded_delay() {
3196        // Run multiple times to exercise the jitter distribution.
3197        let base = LOCK_CONTENTION_BASE_SECS;
3198        let max = LOCK_CONTENTION_BASE_SECS + LOCK_CONTENTION_JITTER_SECS;
3199        for _ in 0..20 {
3200            let delay = jitter_delay();
3201            let secs = delay.as_secs();
3202            assert!(
3203                secs >= base,
3204                "jitter delay {secs}s should be at least base {base}s",
3205            );
3206            assert!(
3207                secs <= max,
3208                "jitter delay {secs}s should not exceed base+jitter {max}s",
3209            );
3210        }
3211    }
3212
3213    #[test]
3214    fn lock_contention_constants_are_reasonable() {
3215        // Use variables to avoid clippy::assertions_on_constants.
3216        let base = LOCK_CONTENTION_BASE_SECS;
3217        let jitter = LOCK_CONTENTION_JITTER_SECS;
3218        assert!(base > 0, "base delay must be positive");
3219        assert!(jitter > 0, "jitter window must be positive");
3220        assert!(
3221            base + jitter <= 60,
3222            "total max contention delay should not exceed error_policy's 60s"
3223        );
3224    }
3225
3226    #[test]
3227    fn transient_backoff_delay_is_bounded_and_caps() {
3228        for _ in 0..20 {
3229            let first = transient_backoff_delay(1).as_secs();
3230            assert!((TRANSIENT_BACKOFF_BASE_SECS..=7).contains(&first));
3231
3232            let fourth = transient_backoff_delay(4).as_secs();
3233            assert!((40..=60).contains(&fourth));
3234
3235            let capped = transient_backoff_delay(10).as_secs();
3236            assert_eq!(capped, TRANSIENT_BACKOFF_MAX_SECS);
3237        }
3238    }
3239
3240    #[test]
3241    fn slow_retry_delay_uses_policy_interval() {
3242        let resource = test_policy("7m", 0);
3243        assert_eq!(slow_retry_delay(&resource), Duration::from_secs(420));
3244    }
3245
3246    #[test]
3247    fn slow_retry_delay_falls_back_on_invalid_interval() {
3248        let resource = test_policy("nope", 0);
3249        assert_eq!(
3250            slow_retry_delay(&resource),
3251            Duration::from_secs(DEFAULT_REQUEUE_SECS)
3252        );
3253    }
3254
3255    #[test]
3256    fn retry_classifies_lock_contention_separately() {
3257        let error = finalizer::Error::ApplyFailed(ReconcileError::LockContention(
3258            "default/db-credentials/DATABASE_URL".into(),
3259            "lock held".into(),
3260        ));
3261        assert_eq!(retry_class(&error), RetryClass::LockContention);
3262    }
3263
3264    #[test]
3265    fn retry_classifies_invalid_spec_as_slow() {
3266        let error = finalizer::Error::ApplyFailed(ReconcileError::InvalidInterval(
3267            "oops".into(),
3268            "bad interval".into(),
3269        ));
3270        assert_eq!(retry_class(&error), RetryClass::Slow);
3271    }
3272
3273    #[test]
3274    fn retry_classifies_missing_database_objects_as_slow() {
3275        let error = finalizer::Error::ApplyFailed(ReconcileError::MissingDatabaseObjects(
3276            "schema \"etl\"".into(),
3277        ));
3278        assert_eq!(retry_class(&error), RetryClass::Slow);
3279    }
3280
3281    #[test]
3282    fn retry_classifies_unsatisfiable_wildcard_grant_as_slow() {
3283        let error = finalizer::Error::ApplyFailed(ReconcileError::UnsatisfiableWildcardGrant(
3284            "UnsatisfiableWildcardGrant: function f2() is not grantable".into(),
3285        ));
3286        assert_eq!(retry_class(&error), RetryClass::Slow);
3287    }
3288
3289    #[test]
3290    fn retry_classifies_plan_sql_storage_as_slow() {
3291        let error =
3292            finalizer::Error::ApplyFailed(ReconcileError::PlanSqlStorage("gzip failed".into()));
3293        assert_eq!(retry_class(&error), RetryClass::Slow);
3294    }
3295
3296    #[test]
3297    fn retry_classifies_secret_missing_as_slow() {
3298        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3299            crate::context::ContextError::SecretMissing {
3300                name: "db-credentials".into(),
3301                key: "DATABASE_URL".into(),
3302            },
3303        )));
3304        assert_eq!(retry_class(&error), RetryClass::Slow);
3305    }
3306
3307    #[test]
3308    fn retry_classifies_secret_fetch_not_found_as_slow() {
3309        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3310            crate::context::ContextError::SecretFetch {
3311                name: "db-credentials".into(),
3312                namespace: "default".into(),
3313                source: kube::Error::Api(
3314                    kube::core::Status::failure("secrets \"db-credentials\" not found", "NotFound")
3315                        .with_code(404)
3316                        .boxed(),
3317                ),
3318            },
3319        )));
3320        assert_eq!(retry_class(&error), RetryClass::Slow);
3321    }
3322
3323    #[test]
3324    fn retry_classifies_secret_fetch_transport_errors_as_transient() {
3325        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3326            crate::context::ContextError::SecretFetch {
3327                name: "db-credentials".into(),
3328                namespace: "default".into(),
3329                source: kube::Error::Api(
3330                    kube::core::Status::failure("internal error", "InternalError")
3331                        .with_code(500)
3332                        .boxed(),
3333                ),
3334            },
3335        )));
3336        assert_eq!(retry_class(&error), RetryClass::Transient);
3337    }
3338
3339    #[test]
3340    fn retry_classifies_secret_fetch_forbidden_as_slow() {
3341        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3342            crate::context::ContextError::SecretFetch {
3343                name: "db-credentials".into(),
3344                namespace: "default".into(),
3345                source: kube::Error::Api(
3346                    kube::core::Status::failure("forbidden", "Forbidden")
3347                        .with_code(403)
3348                        .boxed(),
3349                ),
3350            },
3351        )));
3352        assert_eq!(retry_class(&error), RetryClass::Slow);
3353    }
3354
3355    #[test]
3356    fn retry_classifies_database_connect_as_transient() {
3357        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3358            crate::context::ContextError::DatabaseConnect {
3359                source: sqlx::Error::PoolTimedOut,
3360            },
3361        )));
3362        assert_eq!(retry_class(&error), RetryClass::Transient);
3363    }
3364
3365    #[test]
3366    fn retry_classifies_sql_exec_insufficient_privilege_as_slow() {
3367        let error = finalizer::Error::ApplyFailed(ReconcileError::SqlExec(
3368            insufficient_privilege_sqlx_error(),
3369        ));
3370        assert_eq!(retry_class(&error), RetryClass::Slow);
3371    }
3372
3373    #[test]
3374    fn retry_classifies_inspect_insufficient_privilege_as_slow() {
3375        let error = finalizer::Error::ApplyFailed(ReconcileError::Inspect(
3376            pgroles_inspect::InspectError::Database(insufficient_privilege_sqlx_error()),
3377        ));
3378        assert_eq!(retry_class(&error), RetryClass::Slow);
3379    }
3380
3381    #[test]
3382    fn classify_sqlx_error_categories() {
3383        assert_eq!(
3384            classify_sqlx_error(&insufficient_privilege_sqlx_error()),
3385            SqlErrorKind::InsufficientPrivileges
3386        );
3387        assert_eq!(
3388            classify_sqlx_error(&missing_schema_sqlx_error()),
3389            SqlErrorKind::MissingDatabaseObject
3390        );
3391        assert_eq!(
3392            classify_sqlx_error(&missing_table_sqlx_error()),
3393            SqlErrorKind::MissingDatabaseObject
3394        );
3395        assert_eq!(
3396            classify_sqlx_error(&missing_function_sqlx_error()),
3397            SqlErrorKind::MissingDatabaseObject
3398        );
3399        assert_eq!(
3400            classify_sqlx_error(&missing_object_sqlx_error()),
3401            SqlErrorKind::MissingDatabaseObject
3402        );
3403        assert_eq!(
3404            classify_sqlx_error(&transient_sqlx_error()),
3405            SqlErrorKind::Transient
3406        );
3407    }
3408
3409    #[test]
3410    fn retry_classifies_sql_exec_missing_schema_as_slow() {
3411        let error =
3412            finalizer::Error::ApplyFailed(ReconcileError::SqlExec(missing_schema_sqlx_error()));
3413        assert_eq!(retry_class(&error), RetryClass::Slow);
3414    }
3415
3416    #[test]
3417    fn retry_classifies_sql_exec_missing_table_as_slow() {
3418        let error =
3419            finalizer::Error::ApplyFailed(ReconcileError::SqlExec(missing_table_sqlx_error()));
3420        assert_eq!(retry_class(&error), RetryClass::Slow);
3421    }
3422
3423    #[test]
3424    fn retry_classifies_inspect_missing_schema_as_slow() {
3425        let error = finalizer::Error::ApplyFailed(ReconcileError::Inspect(
3426            pgroles_inspect::InspectError::Database(missing_schema_sqlx_error()),
3427        ));
3428        assert_eq!(retry_class(&error), RetryClass::Slow);
3429    }
3430
3431    #[test]
3432    fn error_reason_sql_exec_missing_database_object() {
3433        let err = ReconcileError::SqlExec(missing_schema_sqlx_error());
3434        assert_eq!(err.reason(), "MissingDatabaseObject");
3435    }
3436
3437    #[test]
3438    fn error_reason_inspect_missing_database_object() {
3439        let err = ReconcileError::Inspect(pgroles_inspect::InspectError::Database(
3440            missing_table_sqlx_error(),
3441        ));
3442        assert_eq!(err.reason(), "MissingDatabaseObject");
3443    }
3444
3445    #[test]
3446    fn retry_classifies_empty_resolved_value_as_slow() {
3447        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3448            crate::context::ContextError::EmptyResolvedValue {
3449                field: "password".to_string(),
3450            },
3451        )));
3452        assert_eq!(retry_class(&error), RetryClass::Slow);
3453    }
3454
3455    #[test]
3456    fn error_reason_empty_resolved_value() {
3457        let err =
3458            ReconcileError::Context(Box::new(crate::context::ContextError::EmptyResolvedValue {
3459                field: "host".to_string(),
3460            }));
3461        assert_eq!(err.reason(), "InvalidConnectionParams");
3462    }
3463
3464    #[test]
3465    fn retry_classifies_invalid_resolved_ssl_mode_as_slow() {
3466        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3467            crate::context::ContextError::InvalidResolvedSslMode {
3468                value: "bogus".to_string(),
3469            },
3470        )));
3471        assert_eq!(retry_class(&error), RetryClass::Slow);
3472    }
3473
3474    #[test]
3475    fn error_reason_invalid_resolved_ssl_mode() {
3476        let err = ReconcileError::Context(Box::new(
3477            crate::context::ContextError::InvalidResolvedSslMode {
3478                value: "bogus".to_string(),
3479            },
3480        ));
3481        assert_eq!(err.reason(), "InvalidConnectionParams");
3482    }
3483
3484    #[test]
3485    fn error_reason_sql_exec_transient_is_apply_failed() {
3486        let err = ReconcileError::SqlExec(transient_sqlx_error());
3487        assert_eq!(err.reason(), "ApplyFailed");
3488    }
3489
3490    #[test]
3491    fn error_reason_plan_sql_storage_failed() {
3492        let err = ReconcileError::PlanSqlStorage("gzip failed".into());
3493        assert_eq!(err.reason(), "PlanSqlStorageFailed");
3494    }
3495
3496    #[test]
3497    fn error_policy_uses_normal_interval_for_invalid_spec() {
3498        let resource = test_policy("11m", 0);
3499        let error = finalizer::Error::ApplyFailed(ReconcileError::InvalidInterval(
3500            "oops".into(),
3501            "bad interval".into(),
3502        ));
3503        assert_eq!(
3504            retry_action(&resource, &error),
3505            Action::requeue(Duration::from_secs(660))
3506        );
3507    }
3508
3509    #[test]
3510    fn error_policy_uses_exponential_backoff_for_transient_failures() {
3511        let resource = test_policy("5m", 3);
3512        let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3513            crate::context::ContextError::DatabaseConnect {
3514                source: sqlx::Error::PoolTimedOut,
3515            },
3516        )));
3517        let action = retry_action(&resource, &error);
3518        assert!(
3519            (40..=60).any(|secs| action == Action::requeue(Duration::from_secs(secs))),
3520            "expected transient retry between 40s and 60s, got {action:?}"
3521        );
3522    }
3523
3524    #[test]
3525    fn render_plan_sql_for_status_redacts_passwords() {
3526        let changes = vec![
3527            pgroles_core::diff::Change::CreateRole {
3528                name: "app-svc".to_string(),
3529                state: pgroles_core::model::RoleState {
3530                    login: true,
3531                    ..pgroles_core::model::RoleState::default()
3532                },
3533            },
3534            pgroles_core::diff::Change::SetPassword {
3535                name: "app-svc".to_string(),
3536                password: "super_secret_p@ssw0rd!".to_string(),
3537            },
3538        ];
3539
3540        let sql_ctx = pgroles_core::sql::SqlContext::default();
3541        let (sql, truncated) = render_plan_sql_for_status(&changes, &sql_ctx);
3542
3543        let sql = sql.expect("expected non-empty planned SQL");
3544        assert!(!truncated);
3545        assert!(
3546            sql.contains("[REDACTED]"),
3547            "status SQL should contain [REDACTED], got: {sql}"
3548        );
3549        assert!(
3550            !sql.contains("super_secret_p@ssw0rd!"),
3551            "status SQL must NOT contain the actual password, got: {sql}"
3552        );
3553        assert!(
3554            sql.contains("CREATE ROLE"),
3555            "status SQL should still contain non-password changes, got: {sql}"
3556        );
3557    }
3558
3559    #[test]
3560    fn render_plan_sql_for_status_empty_changes_returns_none() {
3561        let sql_ctx = pgroles_core::sql::SqlContext::default();
3562        let (sql, truncated) = render_plan_sql_for_status(&[], &sql_ctx);
3563        assert!(sql.is_none());
3564        assert!(!truncated);
3565    }
3566
3567    #[test]
3568    fn render_plan_sql_for_status_password_only_plan() {
3569        let changes = vec![pgroles_core::diff::Change::SetPassword {
3570            name: "db-user".to_string(),
3571            password: "my_secret_pw".to_string(),
3572        }];
3573
3574        let sql_ctx = pgroles_core::sql::SqlContext::default();
3575        let (sql, _) = render_plan_sql_for_status(&changes, &sql_ctx);
3576
3577        let sql = sql.expect("expected non-empty planned SQL");
3578        assert!(
3579            sql.contains("[REDACTED]"),
3580            "password-only plan should still show redacted SQL"
3581        );
3582        assert!(
3583            !sql.contains("my_secret_pw"),
3584            "password-only plan must NOT leak the password"
3585        );
3586    }
3587
3588    #[test]
3589    fn error_reason_empty_password_secret() {
3590        let err = ReconcileError::EmptyPasswordSecret {
3591            role: "app-svc".to_string(),
3592            secret: "pg-passwords".to_string(),
3593            key: "app-svc".to_string(),
3594        };
3595        assert_eq!(err.reason(), "InvalidSpec");
3596    }
3597
3598    #[test]
3599    fn retry_classifies_empty_password_secret_as_slow() {
3600        let error = finalizer::Error::ApplyFailed(ReconcileError::EmptyPasswordSecret {
3601            role: "app-svc".to_string(),
3602            secret: "pg-passwords".to_string(),
3603            key: "app-svc".to_string(),
3604        });
3605        assert_eq!(retry_class(&error), RetryClass::Slow);
3606    }
3607
3608    #[test]
3609    fn error_reason_password_generation() {
3610        let err = ReconcileError::PasswordGeneration(Box::new(
3611            crate::password::PasswordError::MissingKey {
3612                secret: "my-secret".to_string(),
3613                key: "password".to_string(),
3614            },
3615        ));
3616        assert_eq!(err.reason(), "SecretFetchFailed");
3617    }
3618
3619    #[test]
3620    fn retry_classifies_password_generation_missing_key_as_slow() {
3621        let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3622            crate::password::PasswordError::MissingKey {
3623                secret: "my-secret".to_string(),
3624                key: "password".to_string(),
3625            },
3626        )));
3627        assert_eq!(retry_class(&error), RetryClass::Slow);
3628    }
3629
3630    #[test]
3631    fn retry_classifies_password_generation_kube_server_error_as_transient() {
3632        let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3633            crate::password::PasswordError::KubeApi {
3634                secret: "my-secret".to_string(),
3635                source: Box::new(kube::Error::Api(
3636                    kube::core::Status::failure("internal error", "InternalError")
3637                        .with_code(500)
3638                        .boxed(),
3639                )),
3640            },
3641        )));
3642        assert_eq!(retry_class(&error), RetryClass::Transient);
3643    }
3644
3645    #[test]
3646    fn retry_classifies_password_generation_kube_forbidden_as_slow() {
3647        let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3648            crate::password::PasswordError::KubeApi {
3649                secret: "my-secret".to_string(),
3650                source: Box::new(kube::Error::Api(
3651                    kube::core::Status::failure("forbidden", "Forbidden")
3652                        .with_code(403)
3653                        .boxed(),
3654                )),
3655            },
3656        )));
3657        assert_eq!(retry_class(&error), RetryClass::Slow);
3658    }
3659
3660    #[test]
3661    fn accumulate_summary_counts_passwords() {
3662        use pgroles_core::diff::Change;
3663
3664        let mut summary = ChangeSummary::default();
3665        accumulate_summary(
3666            &mut summary,
3667            &Change::SetPassword {
3668                name: "app-svc".to_string(),
3669                password: "secret".to_string(),
3670            },
3671        );
3672        assert_eq!(summary.passwords_set, 1);
3673    }
3674
3675    #[test]
3676    fn conflict_detection_ignores_invalid_peer_policies() {
3677        let resource = valid_role_policy("valid-policy", "analytics", "shared-db-secret");
3678        let identity = DatabaseIdentity::from_connection("default", &resource.spec.connection);
3679        let ownership = resource.spec.ownership_claims().unwrap();
3680        let invalid_peer = invalid_profile_policy("invalid-peer", "shared-db-secret");
3681
3682        let conflict =
3683            detect_policy_conflict_in_list(&resource, &identity, &ownership, vec![invalid_peer]);
3684
3685        assert_eq!(conflict, None);
3686    }
3687
3688    #[test]
3689    fn resolve_passwords_from_cached_secrets_supports_default_and_explicit_keys() {
3690        let resource = password_role_policy();
3691        let cache = BTreeMap::from([(
3692            "role-passwords".to_string(),
3693            secret_with_keys(
3694                "role-passwords",
3695                &[
3696                    ("app", "app-secret"),
3697                    ("reporter-password", "reporter-secret"),
3698                ],
3699            ),
3700        )]);
3701
3702        let resolved =
3703            resolve_passwords_from_cached_secrets(&resource, &cache).expect("should resolve");
3704
3705        assert_eq!(
3706            resolved
3707                .get("app")
3708                .map(|password| password.cleartext.as_str()),
3709            Some("app-secret")
3710        );
3711        assert_eq!(
3712            resolved
3713                .get("reporter")
3714                .map(|password| password.cleartext.as_str()),
3715            Some("reporter-secret")
3716        );
3717    }
3718
3719    #[test]
3720    fn resolve_passwords_from_cached_secrets_reports_missing_key() {
3721        let resource = password_role_policy();
3722        let cache = BTreeMap::from([(
3723            "role-passwords".to_string(),
3724            secret_with_keys("role-passwords", &[("app", "app-secret")]),
3725        )]);
3726
3727        let err = resolve_passwords_from_cached_secrets(&resource, &cache).unwrap_err();
3728        let context = match err {
3729            ReconcileError::Context(context) => context,
3730            other => panic!("expected context error, got {other:?}"),
3731        };
3732        assert!(matches!(
3733            *context,
3734            crate::context::ContextError::SecretMissing { ref name, ref key }
3735            if name == "role-passwords" && key == "reporter-password"
3736        ));
3737    }
3738
3739    #[test]
3740    fn resolve_passwords_from_cached_secrets_reports_empty_password() {
3741        let resource = password_role_policy();
3742        let cache = BTreeMap::from([(
3743            "role-passwords".to_string(),
3744            secret_with_keys(
3745                "role-passwords",
3746                &[("app", ""), ("reporter-password", "ok")],
3747            ),
3748        )]);
3749
3750        let err = resolve_passwords_from_cached_secrets(&resource, &cache).unwrap_err();
3751        assert!(matches!(
3752            err,
3753            ReconcileError::EmptyPasswordSecret { ref role, ref secret, ref key }
3754            if role == "app" && secret == "role-passwords" && key == "app"
3755        ));
3756    }
3757
3758    #[test]
3759    fn resolve_passwords_from_cached_secrets_allows_whitespace_passwords() {
3760        let resource = password_role_policy();
3761        let cache = BTreeMap::from([(
3762            "role-passwords".to_string(),
3763            secret_with_keys(
3764                "role-passwords",
3765                &[("app", "   "), ("reporter-password", "\tsecret")],
3766            ),
3767        )]);
3768
3769        let resolved =
3770            resolve_passwords_from_cached_secrets(&resource, &cache).expect("should resolve");
3771
3772        assert_eq!(
3773            resolved
3774                .get("app")
3775                .map(|password| password.cleartext.as_str()),
3776            Some("   ")
3777        );
3778        assert_eq!(
3779            resolved
3780                .get("reporter")
3781                .map(|password| password.cleartext.as_str()),
3782            Some("\tsecret")
3783        );
3784    }
3785
3786    #[test]
3787    fn select_password_changes_skips_unchanged_password_sources() {
3788        let resolved = BTreeMap::from([(
3789            "app".to_string(),
3790            ResolvedPassword {
3791                cleartext: "app-secret".to_string(),
3792                source_version: "role-passwords:app:7".to_string(),
3793            },
3794        )]);
3795        let status = PostgresPolicyStatus {
3796            applied_password_source_versions: BTreeMap::from([(
3797                "app".to_string(),
3798                "role-passwords:app:7".to_string(),
3799            )]),
3800            ..Default::default()
3801        };
3802
3803        let (password_changes, current_versions) =
3804            select_password_changes(&[], &resolved, Some(&status));
3805
3806        assert!(password_changes.is_empty());
3807        assert_eq!(
3808            current_versions.get("app").map(String::as_str),
3809            Some("role-passwords:app:7")
3810        );
3811    }
3812
3813    #[test]
3814    fn select_password_changes_applies_on_source_version_change() {
3815        let resolved = BTreeMap::from([(
3816            "app".to_string(),
3817            ResolvedPassword {
3818                cleartext: "new-secret".to_string(),
3819                source_version: "role-passwords:app:8".to_string(),
3820            },
3821        )]);
3822        let status = PostgresPolicyStatus {
3823            applied_password_source_versions: BTreeMap::from([(
3824                "app".to_string(),
3825                "role-passwords:app:7".to_string(),
3826            )]),
3827            ..Default::default()
3828        };
3829
3830        let (password_changes, _) = select_password_changes(&[], &resolved, Some(&status));
3831
3832        assert_eq!(
3833            password_changes.get("app").map(String::as_str),
3834            Some("new-secret")
3835        );
3836    }
3837
3838    #[test]
3839    fn select_password_changes_applies_for_newly_created_role() {
3840        use pgroles_core::diff::Change;
3841        use pgroles_core::model::RoleState;
3842
3843        let resolved = BTreeMap::from([(
3844            "app".to_string(),
3845            ResolvedPassword {
3846                cleartext: "new-secret".to_string(),
3847                source_version: "role-passwords:app:7".to_string(),
3848            },
3849        )]);
3850        let status = PostgresPolicyStatus {
3851            applied_password_source_versions: BTreeMap::from([(
3852                "app".to_string(),
3853                "role-passwords:app:7".to_string(),
3854            )]),
3855            ..Default::default()
3856        };
3857        let changes = vec![Change::CreateRole {
3858            name: "app".to_string(),
3859            state: RoleState {
3860                login: true,
3861                ..RoleState::default()
3862            },
3863        }];
3864
3865        let (password_changes, _) = select_password_changes(&changes, &resolved, Some(&status));
3866
3867        assert_eq!(
3868            password_changes.get("app").map(String::as_str),
3869            Some("new-secret")
3870        );
3871    }
3872
3873    #[test]
3874    fn select_password_changes_applies_all_on_first_reconcile() {
3875        // When status is None (first reconcile), all passwords should be applied
3876        // since there are no previous source versions to compare against.
3877        let resolved = BTreeMap::from([
3878            (
3879                "app".to_string(),
3880                ResolvedPassword {
3881                    cleartext: "secret-a".to_string(),
3882                    source_version: "role-passwords:app:1".to_string(),
3883                },
3884            ),
3885            (
3886                "reporter".to_string(),
3887                ResolvedPassword {
3888                    cleartext: "secret-b".to_string(),
3889                    source_version: "role-passwords:reporter:1".to_string(),
3890                },
3891            ),
3892        ]);
3893        let changes: Vec<pgroles_core::diff::Change> = vec![];
3894
3895        let (password_changes, versions) = select_password_changes(&changes, &resolved, None);
3896
3897        assert_eq!(
3898            password_changes.len(),
3899            2,
3900            "all passwords should be applied on first reconcile"
3901        );
3902        assert_eq!(
3903            password_changes.get("app").map(String::as_str),
3904            Some("secret-a")
3905        );
3906        assert_eq!(
3907            password_changes.get("reporter").map(String::as_str),
3908            Some("secret-b")
3909        );
3910        assert_eq!(versions.len(), 2, "all source versions should be tracked");
3911    }
3912
3913    #[test]
3914    fn conflict_detection_still_reports_overlapping_valid_peers() {
3915        let resource = valid_role_policy("valid-policy", "analytics", "shared-db-secret");
3916        let identity = DatabaseIdentity::from_connection("default", &resource.spec.connection);
3917        let ownership = resource.spec.ownership_claims().unwrap();
3918        let overlapping_peer =
3919            valid_role_policy("overlapping-peer", "analytics", "shared-db-secret");
3920        let invalid_peer = invalid_profile_policy("invalid-peer", "shared-db-secret");
3921
3922        let conflict = detect_policy_conflict_in_list(
3923            &resource,
3924            &identity,
3925            &ownership,
3926            vec![invalid_peer, overlapping_peer],
3927        );
3928
3929        let conflict = conflict.expect("expected overlapping peer to be reported");
3930        assert!(conflict.contains("overlapping-peer"));
3931        assert!(conflict.contains("roles: analytics"));
3932    }
3933
3934    #[test]
3935    fn parse_rfc3339_to_epoch_secs_known_timestamp() {
3936        // 2024-01-01T00:00:00Z = 1704067200
3937        let result = parse_rfc3339_to_epoch_secs("2024-01-01T00:00:00Z");
3938        assert_eq!(result, Some(1704067200));
3939    }
3940
3941    #[test]
3942    fn parse_rfc3339_to_epoch_secs_with_time() {
3943        // 2024-01-01T12:30:45Z = 1704067200 + 12*3600 + 30*60 + 45 = 1704112245
3944        let result = parse_rfc3339_to_epoch_secs("2024-01-01T12:30:45Z");
3945        assert_eq!(result, Some(1704112245));
3946    }
3947
3948    #[test]
3949    fn parse_rfc3339_to_epoch_secs_invalid_returns_none() {
3950        assert_eq!(parse_rfc3339_to_epoch_secs("not-a-date"), None);
3951        assert_eq!(parse_rfc3339_to_epoch_secs(""), None);
3952    }
3953
3954    #[test]
3955    fn parse_rfc3339_roundtrips_with_now_rfc3339() {
3956        let timestamp = crate::crd::now_rfc3339();
3957        let parsed = parse_rfc3339_to_epoch_secs(&timestamp);
3958        assert!(parsed.is_some(), "should parse our own timestamps");
3959        let now_secs = std::time::SystemTime::now()
3960            .duration_since(std::time::UNIX_EPOCH)
3961            .unwrap()
3962            .as_secs();
3963        // Should be within 2 seconds of now.
3964        let diff = now_secs.abs_diff(parsed.unwrap());
3965        assert!(diff <= 2, "parsed time should be close to now, diff={diff}");
3966    }
3967}