Skip to main content

pgroles_operator/
plan.rs

1//! Plan lifecycle management for `PostgresPolicyPlan` resources.
2//!
3//! Handles creating, deduplicating, approving, executing, and cleaning up
4//! reconciliation plans. Plans represent computed SQL change sets that may
5//! require explicit approval before execution against a database.
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::io::Write;
9use std::time::Duration;
10
11use flate2::Compression;
12use flate2::write::GzEncoder;
13use k8s_openapi::ByteString;
14use k8s_openapi::api::core::v1::ConfigMap;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
16use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
17use kube::{Client, Resource, ResourceExt};
18use sha2::{Digest, Sha256};
19use tracing::info;
20
21use crate::crd::{
22    ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_PLAN, LABEL_POLICY,
23    PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
24    PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
25    PostgresPolicyPlanStatus, SqlCompression, SqlRef,
26};
27use crate::reconciler::ReconcileError;
28
29/// Result of plan creation — distinguishes genuinely new plans from
30/// deduplication hits so callers can decide whether to emit events.
31#[derive(Debug, Clone)]
32pub enum PlanCreationResult {
33    /// A new plan was created with the given name.
34    Created(String),
35    /// An existing plan with the same hash was found (deduplication).
36    Deduplicated(String),
37}
38
39impl PlanCreationResult {
40    /// Return the plan name regardless of variant.
41    pub fn plan_name(&self) -> &str {
42        match self {
43            PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
44        }
45    }
46
47    /// True when a new plan was actually created.
48    pub fn is_created(&self) -> bool {
49        matches!(self, PlanCreationResult::Created(_))
50    }
51}
52
53/// Maximum inline SQL size in plan status before spilling to a ConfigMap.
54const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
55
56/// ConfigMap binaryData key for gzip-compressed SQL content.
57const SQL_CONFIGMAP_GZIP_KEY: &str = "plan.sql.gz";
58
59/// Conservative stored-byte ceiling for SQL ConfigMaps. Kubernetes caps
60/// ConfigMap data at 1 MiB; this leaves room for metadata and future labels.
61const MAX_CONFIGMAP_SQL_BYTES: usize = 900 * 1024;
62
63/// Stale status-less plan and orphan ConfigMap grace period.
64const ORPHAN_GRACE_SECS: i64 = 60;
65
66/// Best-effort cleanup should never block a fresh reconcile for long.
67const CLEANUP_TIMEOUT_SECS: u64 = 5;
68
69/// Default maximum number of historical plans to retain per policy.
70const DEFAULT_MAX_PLANS: usize = 10;
71
72/// How recently a Failed plan must have been created (in seconds) for the
73/// dedup check to consider it a match. Plans older than this are ignored so
74/// that retries after the user fixes the environment are not blocked.
75const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78enum PlanSqlArtifact {
79    Inline(String),
80    CompressedConfigMap {
81        configmap_name: String,
82        key: String,
83        compressed_sql: Vec<u8>,
84    },
85    TruncatedInline(String),
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89struct PreparedPlanSql {
90    artifact: PlanSqlArtifact,
91    redacted_sql_hash: String,
92    original_bytes: usize,
93    stored_bytes: usize,
94}
95
96impl PreparedPlanSql {
97    fn sql_ref(&self) -> Option<SqlRef> {
98        match &self.artifact {
99            PlanSqlArtifact::CompressedConfigMap {
100                configmap_name,
101                key,
102                ..
103            } => Some(SqlRef {
104                name: configmap_name.clone(),
105                key: key.clone(),
106                compression: Some(SqlCompression::Gzip),
107            }),
108            PlanSqlArtifact::Inline(_) | PlanSqlArtifact::TruncatedInline(_) => None,
109        }
110    }
111
112    fn sql_inline(&self) -> Option<String> {
113        match &self.artifact {
114            PlanSqlArtifact::Inline(sql) | PlanSqlArtifact::TruncatedInline(sql) => {
115                Some(sql.clone())
116            }
117            PlanSqlArtifact::CompressedConfigMap { .. } => None,
118        }
119    }
120
121    fn is_truncated(&self) -> bool {
122        matches!(self.artifact, PlanSqlArtifact::TruncatedInline(_))
123    }
124}
125
126// ---------------------------------------------------------------------------
127// Plan approval check
128// ---------------------------------------------------------------------------
129
130/// Result of checking a plan's approval annotations.
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub enum PlanApprovalState {
133    Pending,
134    Approved,
135    Rejected,
136}
137
138/// Check the approval state of a plan by inspecting its annotations.
139///
140/// Rejection takes priority over approval: if both annotations are set,
141/// the plan is considered rejected.
142pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
143    let annotations = plan.metadata.annotations.as_ref();
144
145    let rejected = annotations
146        .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
147        .map(|v| v == "true")
148        .unwrap_or(false);
149
150    if rejected {
151        return PlanApprovalState::Rejected;
152    }
153
154    let approved = annotations
155        .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
156        .map(|v| v == "true")
157        .unwrap_or(false);
158
159    if approved {
160        return PlanApprovalState::Approved;
161    }
162
163    PlanApprovalState::Pending
164}
165
166// ---------------------------------------------------------------------------
167// Plan creation
168// ---------------------------------------------------------------------------
169
170/// Create or deduplicate a `PostgresPolicyPlan` for the given policy and changes.
171///
172/// Returns the name of the plan resource (either existing or newly created).
173///
174/// This function:
175/// 1. Renders the full executable SQL from the changes
176/// 2. Computes SHA-256 of the full SQL (before any redaction/truncation)
177/// 3. Checks for an existing Pending plan with the same hash (dedup)
178/// 4. Persists the SQL preview artifact, if needed
179/// 5. Creates the new plan resource with ownerReferences
180/// 6. Updates the plan status
181/// 7. Marks any older Pending plans with a different hash as Superseded
182#[allow(clippy::too_many_arguments)]
183pub async fn create_or_update_plan(
184    client: &Client,
185    policy: &PostgresPolicy,
186    changes: &[pgroles_core::diff::Change],
187    sql_context: &pgroles_core::sql::SqlContext,
188    inspect_config: &pgroles_inspect::InspectConfig,
189    reconciliation_mode: CrdReconciliationMode,
190    database_identity: &str,
191    change_summary: &ChangeSummary,
192) -> Result<PlanCreationResult, ReconcileError> {
193    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
194    let policy_name = policy.name_any();
195    let generation = policy.metadata.generation.unwrap_or(0);
196
197    // 1. Render the full executable SQL (not redacted).
198    let full_sql = render_full_sql(changes, sql_context);
199
200    // 2. Compute SHA-256 hash of the full SQL.
201    let sql_hash = compute_sql_hash(&full_sql);
202
203    // 3. Count SQL statements (after wildcard expansion).
204    let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
205
206    // 4. Render redacted SQL for display (passwords masked).
207    let redacted_sql = render_redacted_sql(changes, sql_context);
208
209    cleanup_old_plans_best_effort(client, policy, None).await;
210
211    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
212
213    // 4. List existing plans for this policy.
214    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
215    let existing_plans = plans_api
216        .list(&ListParams::default().labels(&label_selector))
217        .await?;
218
219    // 5. Check for duplicate pending plan with same hash.
220    for plan in &existing_plans {
221        if let Some(ref status) = plan.status
222            && status.phase == PlanPhase::Pending
223            && status.sql_hash.as_deref() == Some(&sql_hash)
224        {
225            // Identical plan already exists — return early (deduplicated).
226            let plan_name = plan.name_any();
227            info!(
228                plan = %plan_name,
229                policy = %policy_name,
230                "existing pending plan has identical SQL hash, skipping creation"
231            );
232            return Ok(PlanCreationResult::Deduplicated(plan_name));
233        }
234    }
235
236    // 5b. Check for recently-failed plan with the same hash. If a plan with
237    // this exact SQL already failed within the dedup window, creating another
238    // identical one is pointless — it would produce the same error. The window
239    // ensures we don't block retries after the user fixes the environment.
240    //
241    // Uses `status.failed_at` (not `creation_timestamp`) so that plans which
242    // waited for approval before failing are measured from the failure time.
243    let now_ts = now_epoch_secs();
244    for plan in &existing_plans {
245        if let Some(ref status) = plan.status
246            && status.phase == PlanPhase::Failed
247            && status.sql_hash.as_deref() == Some(&sql_hash)
248        {
249            let failed_ts = status
250                .failed_at
251                .as_deref()
252                .and_then(parse_rfc3339_epoch_secs)
253                .unwrap_or(0);
254            if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
255                let plan_name = plan.name_any();
256                info!(
257                    plan = %plan_name,
258                    policy = %policy_name,
259                    age_secs = now_ts - failed_ts,
260                    "recently-failed plan has identical SQL hash, skipping creation"
261                );
262                return Ok(PlanCreationResult::Deduplicated(plan_name));
263            }
264        }
265    }
266
267    // 6. Generate a plan name using timestamp plus SQL hash. The hash suffix
268    // makes same-second retries after content persistence failures idempotent.
269    let plan_name = generate_plan_name(&policy_name, &sql_hash);
270    let prepared_sql = prepare_plan_sql(&plan_name, &redacted_sql)?;
271
272    // 7. Persist SQL content before materialising the visible plan resource.
273    let sql_configmap_name = create_plan_sql_configmap(
274        client,
275        policy,
276        &namespace,
277        &policy_name,
278        database_identity,
279        &prepared_sql,
280    )
281    .await?;
282
283    // 8. Build ownerReference pointing to the parent policy.
284    let owner_ref = build_owner_reference(policy);
285
286    // 9. Create the plan resource.
287    let plan = PostgresPolicyPlan::new(
288        &plan_name,
289        PostgresPolicyPlanSpec {
290            policy_ref: PolicyPlanRef {
291                name: policy_name.clone(),
292            },
293            policy_generation: generation,
294            reconciliation_mode,
295            owned_roles: inspect_config.managed_roles.clone(),
296            owned_schemas: inspect_config.managed_schemas.clone(),
297            managed_database_identity: database_identity.to_string(),
298        },
299    );
300    let mut plan = plan;
301    plan.metadata.namespace = Some(namespace.clone());
302    plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
303    plan.metadata.labels = Some(BTreeMap::from([
304        (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
305        (
306            LABEL_DATABASE_IDENTITY.to_string(),
307            sanitize_label_value(database_identity),
308        ),
309    ]));
310
311    // Annotations for quick visibility in kubectl describe / Lens.
312    let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
313    let summary_text = format!(
314        "{}R {}G {}D {}DP {}M",
315        change_summary.roles_created + change_summary.roles_altered,
316        change_summary.grants_added,
317        change_summary.default_privileges_set,
318        change_summary.roles_dropped,
319        change_summary.members_added,
320    );
321    plan.metadata.annotations = Some(BTreeMap::from([
322        ("pgroles.io/sql-preview".to_string(), sql_preview),
323        ("pgroles.io/summary".to_string(), summary_text),
324        (
325            "pgroles.io/sql-hash".to_string(),
326            sql_hash[..12].to_string(),
327        ),
328        (
329            "pgroles.io/redacted-sql-hash".to_string(),
330            prepared_sql.redacted_sql_hash[..12].to_string(),
331        ),
332        (
333            "pgroles.io/sql-original-bytes".to_string(),
334            prepared_sql.original_bytes.to_string(),
335        ),
336        (
337            "pgroles.io/sql-stored-bytes".to_string(),
338            prepared_sql.stored_bytes.to_string(),
339        ),
340    ]));
341
342    let (created_plan, created_new_plan) =
343        match plans_api.create(&PostParams::default(), &plan).await {
344            Ok(plan) => (plan, true),
345            Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
346                let existing = plans_api.get(&plan_name).await?;
347                if !should_patch_existing_plan_status(&existing) {
348                    return Ok(PlanCreationResult::Deduplicated(existing.name_any()));
349                }
350                (existing, false)
351            }
352            Err(err) => {
353                if let Some(configmap_name) = sql_configmap_name.as_deref() {
354                    delete_configmap_best_effort(client, &namespace, configmap_name).await;
355                }
356                return Err(err.into());
357            }
358        };
359    let plan_name = created_plan.name_any();
360
361    // 11. Update plan status.
362    let computed_message = if prepared_sql.is_truncated() {
363        format!(
364            "Plan computed with {} change(s); SQL preview truncated because compressed SQL exceeded Kubernetes ConfigMap limits",
365            change_summary.total
366        )
367    } else {
368        format!("Plan computed with {} change(s)", change_summary.total)
369    };
370    let plan_status = PostgresPolicyPlanStatus {
371        phase: PlanPhase::Pending,
372        conditions: vec![
373            PolicyCondition {
374                condition_type: "Computed".to_string(),
375                status: "True".to_string(),
376                reason: Some("PlanComputed".to_string()),
377                message: Some(computed_message),
378                last_transition_time: Some(crate::crd::now_rfc3339()),
379            },
380            PolicyCondition {
381                condition_type: "Approved".to_string(),
382                status: "False".to_string(),
383                reason: Some("PendingApproval".to_string()),
384                message: Some("Plan awaiting approval".to_string()),
385                last_transition_time: Some(crate::crd::now_rfc3339()),
386            },
387        ],
388        change_summary: Some(change_summary.clone()),
389        sql_ref: prepared_sql.sql_ref(),
390        sql_inline: prepared_sql.sql_inline(),
391        sql_truncated: prepared_sql.is_truncated(),
392        computed_at: Some(crate::crd::now_rfc3339()),
393        applied_at: None,
394        last_error: None,
395        sql_hash: Some(sql_hash),
396        applying_since: None,
397        failed_at: None,
398        sql_statements: Some(sql_statement_count),
399        redacted_sql_hash: Some(prepared_sql.redacted_sql_hash.clone()),
400        sql_original_bytes: Some(prepared_sql.original_bytes as i64),
401        sql_stored_bytes: Some(prepared_sql.stored_bytes as i64),
402    };
403
404    let status_patch = serde_json::json!({ "status": plan_status });
405    if let Err(err) = plans_api
406        .patch_status(
407            &plan_name,
408            &PatchParams::apply("pgroles-operator"),
409            &Patch::Merge(&status_patch),
410        )
411        .await
412    {
413        if created_new_plan {
414            delete_plan_best_effort(&plans_api, &plan_name).await;
415        }
416        if let Some(configmap_name) = sql_configmap_name.as_deref() {
417            delete_configmap_best_effort(client, &namespace, configmap_name).await;
418        }
419        return Err(err.into());
420    }
421
422    // 12. Mark any existing Pending plans as Superseded after the new plan is
423    // fully visible. This avoids losing the current actionable plan if SQL
424    // persistence fails before the replacement is materialised.
425    for plan in &existing_plans {
426        if let Some(ref status) = plan.status
427            && status.phase == PlanPhase::Pending
428            && plan.name_any() != plan_name
429        {
430            let old_plan_name = plan.name_any();
431            info!(
432                plan = %old_plan_name,
433                policy = %policy_name,
434                "marking existing pending plan as Superseded"
435            );
436            let superseded_status = PostgresPolicyPlanStatus {
437                phase: PlanPhase::Superseded,
438                ..status.clone()
439            };
440            let patch = serde_json::json!({ "status": superseded_status });
441            plans_api
442                .patch_status(
443                    &old_plan_name,
444                    &PatchParams::apply("pgroles-operator"),
445                    &Patch::Merge(&patch),
446                )
447                .await?;
448        }
449    }
450
451    info!(
452        plan = %plan_name,
453        policy = %policy_name,
454        changes = change_summary.total,
455        "created new plan"
456    );
457
458    Ok(PlanCreationResult::Created(plan_name))
459}
460
461// ---------------------------------------------------------------------------
462// Plan execution
463// ---------------------------------------------------------------------------
464
465/// Execute an approved plan against the database.
466///
467/// Re-renders executable SQL from the reconciler's in-memory changes, executes
468/// it in a transaction, and updates the plan status to Applied or Failed.
469/// Persisted SQL on the plan is a redacted review artifact only; apply must not
470/// read it because large plans may store only a truncated preview.
471pub async fn execute_plan(
472    client: &Client,
473    plan: &PostgresPolicyPlan,
474    pool: &sqlx::PgPool,
475    sql_context: &pgroles_core::sql::SqlContext,
476    changes: &[pgroles_core::diff::Change],
477) -> Result<(), ReconcileError> {
478    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
479    let plan_name = plan.name_any();
480    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
481
482    // Update phase to Applying.
483    update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
484
485    // Execute the SQL in a transaction using the original changes (not stored SQL).
486    // This ensures we use the actual executable SQL including real passwords,
487    // not the redacted version stored in the plan.
488    let result = execute_changes_in_transaction(pool, changes, sql_context).await;
489
490    match result {
491        Ok(statements_executed) => {
492            // Update plan status to Applied.
493            let mut applied_status = plan.status.clone().unwrap_or_default();
494            applied_status.phase = PlanPhase::Applied;
495            applied_status.applied_at = Some(crate::crd::now_rfc3339());
496            applied_status.last_error = None;
497            set_plan_condition(
498                &mut applied_status.conditions,
499                "Approved",
500                "True",
501                "Approved",
502                "Plan approved and executed",
503            );
504
505            let patch = serde_json::json!({ "status": applied_status });
506            plans_api
507                .patch_status(
508                    &plan_name,
509                    &PatchParams::apply("pgroles-operator"),
510                    &Patch::Merge(&patch),
511                )
512                .await?;
513
514            info!(
515                plan = %plan_name,
516                statements = statements_executed,
517                "plan executed successfully"
518            );
519            Ok(())
520        }
521        Err(err) => {
522            // Update plan status to Failed.
523            let error_message = err.to_string();
524            let mut failed_status = plan.status.clone().unwrap_or_default();
525            failed_status.phase = PlanPhase::Failed;
526            failed_status.last_error = Some(error_message);
527            failed_status.failed_at = Some(crate::crd::now_rfc3339());
528
529            let patch = serde_json::json!({ "status": failed_status });
530            if let Err(status_err) = plans_api
531                .patch_status(
532                    &plan_name,
533                    &PatchParams::apply("pgroles-operator"),
534                    &Patch::Merge(&patch),
535                )
536                .await
537            {
538                tracing::warn!(
539                    plan = %plan_name,
540                    %status_err,
541                    "failed to update plan status to Failed"
542                );
543            }
544
545            Err(err)
546        }
547    }
548}
549
550fn prepare_plan_sql(
551    plan_name: &str,
552    redacted_sql: &str,
553) -> Result<PreparedPlanSql, ReconcileError> {
554    let original_bytes = redacted_sql.len();
555    let redacted_sql_hash = compute_sql_hash(redacted_sql);
556
557    if original_bytes <= MAX_INLINE_SQL_BYTES {
558        return Ok(PreparedPlanSql {
559            artifact: PlanSqlArtifact::Inline(redacted_sql.to_string()),
560            redacted_sql_hash,
561            original_bytes,
562            stored_bytes: original_bytes,
563        });
564    }
565
566    let compressed_sql = gzip_bytes(redacted_sql.as_bytes())?;
567    if compressed_sql.len() <= MAX_CONFIGMAP_SQL_BYTES {
568        let stored_bytes = compressed_sql.len();
569        return Ok(PreparedPlanSql {
570            artifact: PlanSqlArtifact::CompressedConfigMap {
571                configmap_name: format!("{plan_name}-sql"),
572                key: SQL_CONFIGMAP_GZIP_KEY.to_string(),
573                compressed_sql,
574            },
575            redacted_sql_hash,
576            original_bytes,
577            stored_bytes,
578        });
579    }
580
581    let truncated = truncate_utf8(
582        redacted_sql,
583        MAX_INLINE_SQL_BYTES,
584        "\n-- truncated: compressed SQL preview exceeded Kubernetes ConfigMap limits --",
585    );
586    let stored_bytes = truncated.len();
587    Ok(PreparedPlanSql {
588        artifact: PlanSqlArtifact::TruncatedInline(truncated),
589        redacted_sql_hash,
590        original_bytes,
591        stored_bytes,
592    })
593}
594
595fn gzip_bytes(bytes: &[u8]) -> Result<Vec<u8>, ReconcileError> {
596    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
597    encoder
598        .write_all(bytes)
599        .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))?;
600    encoder
601        .finish()
602        .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))
603}
604
605fn truncate_utf8(text: &str, max_bytes: usize, marker: &str) -> String {
606    if text.len() <= max_bytes {
607        return text.to_string();
608    }
609
610    let target_len = max_bytes.saturating_sub(marker.len());
611    let mut end = target_len.min(text.len());
612    while end > 0 && !text.is_char_boundary(end) {
613        end -= 1;
614    }
615
616    let mut truncated = text[..end].to_string();
617    truncated.push_str(marker);
618    truncated
619}
620
621async fn create_plan_sql_configmap(
622    client: &Client,
623    policy: &PostgresPolicy,
624    namespace: &str,
625    policy_name: &str,
626    database_identity: &str,
627    prepared_sql: &PreparedPlanSql,
628) -> Result<Option<String>, ReconcileError> {
629    let PlanSqlArtifact::CompressedConfigMap {
630        configmap_name,
631        key: _,
632        compressed_sql: _,
633    } = &prepared_sql.artifact
634    else {
635        return Ok(None);
636    };
637
638    let configmap = build_plan_sql_configmap_object(
639        policy,
640        namespace,
641        policy_name,
642        database_identity,
643        prepared_sql,
644    )?;
645
646    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
647    match configmaps_api
648        .create(&PostParams::default(), &configmap)
649        .await
650    {
651        Ok(_) => Ok(Some(configmap_name.clone())),
652        Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
653            let existing = configmaps_api.get(configmap_name).await?;
654            validate_existing_sql_configmap(&existing, prepared_sql)?;
655            Ok(Some(configmap_name.clone()))
656        }
657        Err(err) => Err(err.into()),
658    }
659}
660
661fn build_plan_sql_configmap_object(
662    policy: &PostgresPolicy,
663    namespace: &str,
664    policy_name: &str,
665    database_identity: &str,
666    prepared_sql: &PreparedPlanSql,
667) -> Result<ConfigMap, ReconcileError> {
668    let PlanSqlArtifact::CompressedConfigMap {
669        configmap_name,
670        key,
671        compressed_sql,
672    } = &prepared_sql.artifact
673    else {
674        return Err(ReconcileError::PlanSqlStorage(
675            "cannot build ConfigMap for inline plan SQL".to_string(),
676        ));
677    };
678
679    Ok(ConfigMap {
680        metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
681            name: Some(configmap_name.clone()),
682            namespace: Some(namespace.to_string()),
683            owner_references: Some(vec![build_owner_reference(policy)]),
684            labels: Some(BTreeMap::from([
685                (LABEL_POLICY.to_string(), sanitize_label_value(policy_name)),
686                (
687                    LABEL_DATABASE_IDENTITY.to_string(),
688                    sanitize_label_value(database_identity),
689                ),
690                (
691                    LABEL_PLAN.to_string(),
692                    plan_label_value(configmap_plan_name(configmap_name)),
693                ),
694            ])),
695            annotations: Some(BTreeMap::from([
696                ("pgroles.io/sql-compression".to_string(), "gzip".to_string()),
697                (
698                    "pgroles.io/redacted-sql-hash".to_string(),
699                    prepared_sql.redacted_sql_hash.clone(),
700                ),
701                (
702                    "pgroles.io/sql-original-bytes".to_string(),
703                    prepared_sql.original_bytes.to_string(),
704                ),
705                (
706                    "pgroles.io/sql-stored-bytes".to_string(),
707                    prepared_sql.stored_bytes.to_string(),
708                ),
709            ])),
710            ..Default::default()
711        },
712        binary_data: Some(BTreeMap::from([(
713            key.clone(),
714            ByteString(compressed_sql.clone()),
715        )])),
716        ..Default::default()
717    })
718}
719
720fn configmap_plan_name(configmap_name: &str) -> &str {
721    configmap_name
722        .strip_suffix("-sql")
723        .unwrap_or(configmap_name)
724}
725
726fn plan_label_value(plan_name: &str) -> String {
727    compute_sql_hash(plan_name)[..32].to_string()
728}
729
730fn validate_existing_sql_configmap(
731    configmap: &ConfigMap,
732    prepared_sql: &PreparedPlanSql,
733) -> Result<(), ReconcileError> {
734    let Some(annotations) = configmap.metadata.annotations.as_ref() else {
735        return Err(ReconcileError::PlanSqlStorage(format!(
736            "existing ConfigMap {} is missing SQL storage annotations",
737            configmap.name_any()
738        )));
739    };
740    let hash_matches = annotations
741        .get("pgroles.io/redacted-sql-hash")
742        .map(|hash| hash == &prepared_sql.redacted_sql_hash)
743        .unwrap_or(false);
744    if hash_matches {
745        Ok(())
746    } else {
747        Err(ReconcileError::PlanSqlStorage(format!(
748            "existing ConfigMap {} does not match computed SQL preview hash",
749            configmap.name_any()
750        )))
751    }
752}
753
754/// Execute SQL changes in a database transaction.
755///
756/// Returns the number of statements executed on success.
757async fn execute_changes_in_transaction(
758    pool: &sqlx::PgPool,
759    changes: &[pgroles_core::diff::Change],
760    sql_context: &pgroles_core::sql::SqlContext,
761) -> Result<usize, ReconcileError> {
762    let mut transaction = pool.begin().await?;
763    let mut statements_executed = 0usize;
764
765    for change in changes {
766        let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
767        for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
768            if is_sensitive {
769                tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
770            } else {
771                tracing::debug!(%sql, "executing");
772            }
773            sqlx::query(&sql).execute(transaction.as_mut()).await?;
774            statements_executed += 1;
775        }
776    }
777
778    transaction.commit().await?;
779    Ok(statements_executed)
780}
781
782// ---------------------------------------------------------------------------
783// Plan cleanup / retention
784// ---------------------------------------------------------------------------
785
786/// Best-effort cleanup wrapper used on hot reconciliation paths. Cleanup should
787/// reduce leaked resources, never block otherwise valid reconciliation.
788pub async fn cleanup_old_plans_best_effort(
789    client: &Client,
790    policy: &PostgresPolicy,
791    max_plans: Option<usize>,
792) {
793    match tokio::time::timeout(
794        Duration::from_secs(CLEANUP_TIMEOUT_SECS),
795        cleanup_old_plans(client, policy, max_plans),
796    )
797    .await
798    {
799        Ok(Ok(())) => {}
800        Ok(Err(err)) => tracing::warn!(%err, "failed to clean up old plans"),
801        Err(_) => tracing::warn!(
802            timeout_secs = CLEANUP_TIMEOUT_SECS,
803            "timed out cleaning up old plans"
804        ),
805    }
806}
807
808/// Clean up old plans for a policy, retaining at most `max_plans` terminal plans.
809///
810/// Terminal plans are those in Applied, Failed, Superseded, or Rejected phase;
811/// Pending, Approved, and Applying plans are retained. Status-less plans and
812/// SQL ConfigMaps older than a short grace period are treated as stale orphans.
813pub async fn cleanup_old_plans(
814    client: &Client,
815    policy: &PostgresPolicy,
816    max_plans: Option<usize>,
817) -> Result<(), ReconcileError> {
818    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
819    let policy_name = policy.name_any();
820    let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
821
822    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
823    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
824    let existing_plans = plans_api
825        .list(&ListParams::default().labels(&label_selector))
826        .await?;
827    let now_ts = now_epoch_secs();
828
829    for plan in existing_plans
830        .iter()
831        .filter(|plan| is_stale_statusless_plan(plan, now_ts))
832    {
833        let plan_name = plan.name_any();
834        info!(
835            plan = %plan_name,
836            policy = %policy_name,
837            "cleaning up stale status-less plan"
838        );
839        if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
840            tracing::warn!(
841                plan = %plan_name,
842                %err,
843                "failed to delete stale status-less plan during cleanup"
844            );
845        }
846    }
847
848    // Collect terminal plans sorted by creation timestamp (oldest first).
849    let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
850        .iter()
851        .filter(|plan| {
852            plan.status
853                .as_ref()
854                .map(|s| {
855                    matches!(
856                        s.phase,
857                        PlanPhase::Applied
858                            | PlanPhase::Failed
859                            | PlanPhase::Superseded
860                            | PlanPhase::Rejected
861                    )
862                })
863                .unwrap_or(false)
864        })
865        .collect();
866
867    if terminal_plans.len() > max_plans {
868        // Sort by creation timestamp ascending (oldest first).
869        terminal_plans.sort_by(|a, b| {
870            let a_time = a.metadata.creation_timestamp.as_ref();
871            let b_time = b.metadata.creation_timestamp.as_ref();
872            a_time.cmp(&b_time)
873        });
874
875        let plans_to_delete = terminal_plans.len() - max_plans;
876        for plan in terminal_plans.into_iter().take(plans_to_delete) {
877            let plan_name = plan.name_any();
878            info!(
879                plan = %plan_name,
880                policy = %policy_name,
881                "cleaning up old plan"
882            );
883            if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
884                tracing::warn!(
885                    plan = %plan_name,
886                    %err,
887                    "failed to delete old plan during cleanup"
888                );
889            }
890        }
891    }
892
893    cleanup_orphan_sql_configmaps(
894        client,
895        &namespace,
896        &policy_name,
897        &existing_plans.items,
898        now_ts,
899    )
900    .await?;
901
902    Ok(())
903}
904
905// ---------------------------------------------------------------------------
906// Helpers
907// ---------------------------------------------------------------------------
908
909async fn cleanup_orphan_sql_configmaps(
910    client: &Client,
911    namespace: &str,
912    policy_name: &str,
913    existing_plans: &[PostgresPolicyPlan],
914    now_ts: i64,
915) -> Result<(), ReconcileError> {
916    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
917    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(policy_name));
918    let configmaps = configmaps_api
919        .list(&ListParams::default().labels(&label_selector))
920        .await?;
921    let known_plan_labels: BTreeSet<String> = existing_plans
922        .iter()
923        .map(|plan| plan_label_value(&plan.name_any()))
924        .collect();
925    let known_plan_names: BTreeSet<String> =
926        existing_plans.iter().map(ResourceExt::name_any).collect();
927
928    for configmap in configmaps {
929        if !is_orphan_sql_configmap(&configmap, &known_plan_names, &known_plan_labels, now_ts) {
930            continue;
931        }
932
933        let configmap_name = configmap.name_any();
934        info!(
935            configmap = %configmap_name,
936            policy = %policy_name,
937            "cleaning up orphan plan SQL ConfigMap"
938        );
939        if let Err(err) = configmaps_api
940            .delete(&configmap_name, &DeleteParams::default())
941            .await
942        {
943            tracing::warn!(
944                configmap = %configmap_name,
945                %err,
946                "failed to delete orphan plan SQL ConfigMap during cleanup"
947            );
948        }
949    }
950
951    Ok(())
952}
953
954fn is_orphan_sql_configmap(
955    configmap: &ConfigMap,
956    known_plan_names: &BTreeSet<String>,
957    known_plan_labels: &BTreeSet<String>,
958    now_ts: i64,
959) -> bool {
960    let Some(labels) = configmap.metadata.labels.as_ref() else {
961        return false;
962    };
963    if !labels.contains_key(LABEL_POLICY) || !is_stale_object(configmap, now_ts) {
964        return false;
965    }
966    if known_plan_names.contains(configmap_plan_name(&configmap.name_any())) {
967        return false;
968    }
969    labels
970        .get(LABEL_PLAN)
971        .map(|plan_label| !known_plan_labels.contains(plan_label))
972        .unwrap_or(true)
973}
974
975fn should_patch_existing_plan_status(plan: &PostgresPolicyPlan) -> bool {
976    plan.status
977        .as_ref()
978        .map(|status| status.phase == PlanPhase::Pending)
979        .unwrap_or(true)
980}
981
982fn is_stale_statusless_plan(plan: &PostgresPolicyPlan, now_ts: i64) -> bool {
983    plan.status.is_none() && is_stale_object(plan, now_ts)
984}
985
986fn is_stale_object<K>(resource: &K, now_ts: i64) -> bool
987where
988    K: Resource,
989{
990    resource
991        .meta()
992        .creation_timestamp
993        .as_ref()
994        .map(|timestamp| now_ts.saturating_sub(timestamp.0.as_second()) > ORPHAN_GRACE_SECS)
995        .unwrap_or(false)
996}
997
998async fn delete_plan_best_effort(plans_api: &Api<PostgresPolicyPlan>, plan_name: &str) {
999    if let Err(err) = plans_api.delete(plan_name, &DeleteParams::default()).await {
1000        tracing::warn!(
1001            plan = %plan_name,
1002            %err,
1003            "failed to roll back plan after status update failure"
1004        );
1005    }
1006}
1007
1008async fn delete_configmap_best_effort(client: &Client, namespace: &str, configmap_name: &str) {
1009    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
1010    if let Err(err) = configmaps_api
1011        .delete(configmap_name, &DeleteParams::default())
1012        .await
1013    {
1014        tracing::warn!(
1015            configmap = %configmap_name,
1016            %err,
1017            "failed to roll back plan SQL ConfigMap"
1018        );
1019    }
1020}
1021
1022/// Render the full executable SQL from changes (including real passwords).
1023pub(crate) fn render_full_sql(
1024    changes: &[pgroles_core::diff::Change],
1025    sql_context: &pgroles_core::sql::SqlContext,
1026) -> String {
1027    changes
1028        .iter()
1029        .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
1030        .collect::<Vec<_>>()
1031        .join("\n")
1032}
1033
1034/// Render redacted SQL for display (passwords replaced with [REDACTED]).
1035fn render_redacted_sql(
1036    changes: &[pgroles_core::diff::Change],
1037    sql_context: &pgroles_core::sql::SqlContext,
1038) -> String {
1039    changes
1040        .iter()
1041        .flat_map(|change| {
1042            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1043                vec![format!(
1044                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
1045                    pgroles_core::sql::quote_ident(name)
1046                )]
1047            } else {
1048                pgroles_core::sql::render_statements_with_context(change, sql_context)
1049            }
1050        })
1051        .collect::<Vec<_>>()
1052        .join("\n")
1053}
1054
1055/// Compute SHA-256 hash of the SQL string as a hex digest.
1056pub(crate) fn compute_sql_hash(sql: &str) -> String {
1057    use std::fmt::Write as _;
1058
1059    let mut hasher = Sha256::new();
1060    hasher.update(sql.as_bytes());
1061    let digest = hasher.finalize();
1062    let mut hex = String::with_capacity(digest.len() * 2);
1063    for byte in digest {
1064        write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
1065    }
1066    hex
1067}
1068
1069/// Generate a plan name from policy name, current timestamp, and SQL hash.
1070///
1071/// Format: `{policy-name}-plan-{YYYYMMDD-HHMMSS}-{hash-prefix}`
1072///
1073/// The hash suffix makes retries within the same second idempotent if SQL
1074/// content persistence succeeds but plan creation fails.
1075fn generate_plan_name(policy_name: &str, sql_hash: &str) -> String {
1076    let timestamp = format_timestamp_compact();
1077    let suffix = &sql_hash[..12.min(sql_hash.len())];
1078    // Kubernetes names must be <= 253 chars and DNS-compatible.
1079    // Reserve 4 chars for the potential "-sql" ConfigMap suffix.
1080    let max_name_len = 253 - 4; // 249
1081    let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
1082    let prefix = if policy_name.len() > max_prefix_len {
1083        policy_name
1084            .char_indices()
1085            .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_prefix_len)
1086            .map(|(_, ch)| ch)
1087            .collect::<String>()
1088    } else {
1089        policy_name.to_string()
1090    };
1091    format!("{prefix}-plan-{timestamp}-{suffix}")
1092}
1093
1094/// Format the current UTC time as `YYYYMMDD-HHMMSS`.
1095fn format_timestamp_compact() -> String {
1096    use std::time::SystemTime;
1097    let now = SystemTime::now()
1098        .duration_since(SystemTime::UNIX_EPOCH)
1099        .unwrap_or_default();
1100    let secs = now.as_secs();
1101    let (year, month, day) = crate::crd::days_to_date(secs / 86400);
1102    let remaining = secs % 86400;
1103    let hours = remaining / 3600;
1104    let minutes = (remaining % 3600) / 60;
1105    let seconds = remaining % 60;
1106    format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
1107}
1108
1109/// Sanitize a string for use as a Kubernetes label value.
1110///
1111/// Label values must be <= 63 chars and match `[a-z0-9A-Z._-]*`.
1112fn sanitize_label_value(value: &str) -> String {
1113    let sanitized: String = value
1114        .chars()
1115        .map(|c| {
1116            if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
1117                c
1118            } else {
1119                '_'
1120            }
1121        })
1122        .take(63)
1123        .collect();
1124    sanitized
1125}
1126
1127/// Current time as Unix epoch seconds (for dedup window checks).
1128fn now_epoch_secs() -> i64 {
1129    std::time::SystemTime::now()
1130        .duration_since(std::time::UNIX_EPOCH)
1131        .unwrap_or_default()
1132        .as_secs() as i64
1133}
1134
1135/// Parse an RFC 3339 timestamp string to Unix epoch seconds.
1136/// Returns `None` if parsing fails.
1137fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
1138    // Use jiff (already a transitive dep via k8s-openapi) for RFC 3339 parsing.
1139    rfc3339
1140        .parse::<jiff::Timestamp>()
1141        .ok()
1142        .map(|t| t.as_second())
1143}
1144
1145/// Build an OwnerReference pointing from a plan to its parent policy.
1146fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
1147    OwnerReference {
1148        api_version: PostgresPolicy::api_version(&()).to_string(),
1149        kind: PostgresPolicy::kind(&()).to_string(),
1150        name: policy.name_any(),
1151        uid: policy.metadata.uid.clone().unwrap_or_default(),
1152        controller: Some(true),
1153        block_owner_deletion: Some(true),
1154    }
1155}
1156
1157/// Update the phase field on a plan's status.
1158///
1159/// When transitioning to `Applying`, also sets `applying_since` for stuck
1160/// plan detection.
1161async fn update_plan_phase(
1162    plans_api: &Api<PostgresPolicyPlan>,
1163    plan_name: &str,
1164    phase: PlanPhase,
1165) -> Result<(), ReconcileError> {
1166    let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
1167    if phase == PlanPhase::Applying {
1168        patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
1169    }
1170    plans_api
1171        .patch_status(
1172            plan_name,
1173            &PatchParams::apply("pgroles-operator"),
1174            &Patch::Merge(&patch_value),
1175        )
1176        .await?;
1177    Ok(())
1178}
1179
1180/// Set or update a condition in a conditions list.
1181///
1182/// Preserves `last_transition_time` when the status value is unchanged
1183/// (only reason/message changed), matching Kubernetes condition conventions.
1184fn set_plan_condition(
1185    conditions: &mut Vec<PolicyCondition>,
1186    condition_type: &str,
1187    status: &str,
1188    reason: &str,
1189    message: &str,
1190) {
1191    let transition_time = if let Some(existing) = conditions
1192        .iter()
1193        .find(|c| c.condition_type == condition_type)
1194    {
1195        if existing.status == status {
1196            existing.last_transition_time.clone()
1197        } else {
1198            Some(crate::crd::now_rfc3339())
1199        }
1200    } else {
1201        Some(crate::crd::now_rfc3339())
1202    };
1203
1204    let condition = PolicyCondition {
1205        condition_type: condition_type.to_string(),
1206        status: status.to_string(),
1207        reason: Some(reason.to_string()),
1208        message: Some(message.to_string()),
1209        last_transition_time: transition_time,
1210    };
1211    if let Some(existing) = conditions
1212        .iter_mut()
1213        .find(|c| c.condition_type == condition_type)
1214    {
1215        *existing = condition;
1216    } else {
1217        conditions.push(condition);
1218    }
1219}
1220
1221/// Update the parent policy's `current_plan_ref` in status.
1222pub async fn update_policy_plan_ref(
1223    client: &Client,
1224    policy: &PostgresPolicy,
1225    plan_name: &str,
1226) -> Result<(), ReconcileError> {
1227    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1228    let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
1229
1230    let patch = serde_json::json!({
1231        "status": {
1232            "current_plan_ref": PlanReference {
1233                name: plan_name.to_string(),
1234            }
1235        }
1236    });
1237
1238    policy_api
1239        .patch_status(
1240            &policy.name_any(),
1241            &PatchParams::apply("pgroles-operator"),
1242            &Patch::Merge(&patch),
1243        )
1244        .await?;
1245
1246    Ok(())
1247}
1248
1249/// Look up the current actionable plan for a policy, if any.
1250///
1251/// An actionable plan is one in `Pending` or `Approved` phase — i.e. a plan
1252/// that the reconciler should evaluate for approval/execution.
1253pub async fn get_current_actionable_plan(
1254    client: &Client,
1255    policy: &PostgresPolicy,
1256) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1257    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1258    let policy_name = policy.name_any();
1259
1260    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1261    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1262    let existing_plans = plans_api
1263        .list(&ListParams::default().labels(&label_selector))
1264        .await?;
1265
1266    // Find the most recent actionable plan (Pending or Approved, by creation time).
1267    let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
1268        .into_iter()
1269        .filter(|plan| {
1270            plan.status
1271                .as_ref()
1272                .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
1273                .unwrap_or(false)
1274        })
1275        .collect();
1276
1277    pending_plans.sort_by(|a, b| {
1278        let a_time = a.metadata.creation_timestamp.as_ref();
1279        let b_time = b.metadata.creation_timestamp.as_ref();
1280        b_time.cmp(&a_time) // newest first
1281    });
1282
1283    Ok(pending_plans.into_iter().next())
1284}
1285
1286/// Look up the most recent plan for a policy in a given phase.
1287pub async fn get_plan_by_phase(
1288    client: &Client,
1289    policy: &PostgresPolicy,
1290    target_phase: PlanPhase,
1291) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1292    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1293    let policy_name = policy.name_any();
1294
1295    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1296    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1297    let existing_plans = plans_api
1298        .list(&ListParams::default().labels(&label_selector))
1299        .await?;
1300
1301    let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
1302        .into_iter()
1303        .filter(|plan| {
1304            plan.status
1305                .as_ref()
1306                .map(|s| s.phase == target_phase)
1307                .unwrap_or(false)
1308        })
1309        .collect();
1310
1311    matching_plans.sort_by(|a, b| {
1312        let a_time = a.metadata.creation_timestamp.as_ref();
1313        let b_time = b.metadata.creation_timestamp.as_ref();
1314        b_time.cmp(&a_time) // newest first
1315    });
1316
1317    Ok(matching_plans.into_iter().next())
1318}
1319
1320/// Mark a plan as Failed with a given error message.
1321pub async fn mark_plan_failed(
1322    client: &Client,
1323    plan: &PostgresPolicyPlan,
1324    error_message: &str,
1325) -> Result<(), ReconcileError> {
1326    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1327    let plan_name = plan.name_any();
1328    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1329
1330    let mut status = plan.status.clone().unwrap_or_default();
1331    status.phase = PlanPhase::Failed;
1332    status.last_error = Some(error_message.to_string());
1333    status.failed_at = Some(crate::crd::now_rfc3339());
1334
1335    let patch = serde_json::json!({ "status": status });
1336    plans_api
1337        .patch_status(
1338            &plan_name,
1339            &PatchParams::apply("pgroles-operator"),
1340            &Patch::Merge(&patch),
1341        )
1342        .await?;
1343
1344    info!(
1345        plan = %plan_name,
1346        "marked stuck Applying plan as Failed"
1347    );
1348
1349    Ok(())
1350}
1351
1352/// Mark a plan as Approved.
1353///
1354/// Callers provide `reason` and `message` to distinguish auto-approval from
1355/// manual approval in the plan's conditions.
1356pub async fn mark_plan_approved(
1357    client: &Client,
1358    plan: &PostgresPolicyPlan,
1359    reason: &str,
1360    message: &str,
1361) -> Result<(), ReconcileError> {
1362    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1363    let plan_name = plan.name_any();
1364    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1365
1366    let mut status = plan.status.clone().unwrap_or_default();
1367    status.phase = PlanPhase::Approved;
1368    set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
1369
1370    let patch = serde_json::json!({ "status": status });
1371    plans_api
1372        .patch_status(
1373            &plan_name,
1374            &PatchParams::apply("pgroles-operator"),
1375            &Patch::Merge(&patch),
1376        )
1377        .await?;
1378
1379    Ok(())
1380}
1381
1382/// Mark a plan as Rejected.
1383pub async fn mark_plan_rejected(
1384    client: &Client,
1385    plan: &PostgresPolicyPlan,
1386) -> Result<(), ReconcileError> {
1387    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1388    let plan_name = plan.name_any();
1389    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1390
1391    let mut status = plan.status.clone().unwrap_or_default();
1392    status.phase = PlanPhase::Rejected;
1393    set_plan_condition(
1394        &mut status.conditions,
1395        "Approved",
1396        "False",
1397        "Rejected",
1398        "Plan rejected via annotation",
1399    );
1400
1401    let patch = serde_json::json!({ "status": status });
1402    plans_api
1403        .patch_status(
1404            &plan_name,
1405            &PatchParams::apply("pgroles-operator"),
1406            &Patch::Merge(&patch),
1407        )
1408        .await?;
1409
1410    Ok(())
1411}
1412
1413/// Mark a plan as Superseded (database state changed since approval).
1414pub async fn mark_plan_superseded(
1415    client: &Client,
1416    plan: &PostgresPolicyPlan,
1417) -> Result<(), ReconcileError> {
1418    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1419    let plan_name = plan.name_any();
1420    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1421
1422    let mut status = plan.status.clone().unwrap_or_default();
1423    status.phase = PlanPhase::Superseded;
1424    set_plan_condition(
1425        &mut status.conditions,
1426        "Approved",
1427        "False",
1428        "Superseded",
1429        "Database state changed since plan was approved",
1430    );
1431
1432    let patch = serde_json::json!({ "status": status });
1433    plans_api
1434        .patch_status(
1435            &plan_name,
1436            &PatchParams::apply("pgroles-operator"),
1437            &Patch::Merge(&patch),
1438        )
1439        .await?;
1440
1441    Ok(())
1442}
1443
1444// ---------------------------------------------------------------------------
1445// Tests
1446// ---------------------------------------------------------------------------
1447
1448#[cfg(test)]
1449mod tests {
1450    use super::*;
1451    use crate::crd::CrdReconciliationMode;
1452    use base64::Engine as _;
1453    use flate2::read::GzDecoder;
1454    use std::io::Read;
1455
1456    fn test_plan(
1457        name: &str,
1458        phase: PlanPhase,
1459        annotations: Option<BTreeMap<String, String>>,
1460    ) -> PostgresPolicyPlan {
1461        let mut plan = PostgresPolicyPlan::new(
1462            name,
1463            PostgresPolicyPlanSpec {
1464                policy_ref: PolicyPlanRef {
1465                    name: "test-policy".to_string(),
1466                },
1467                policy_generation: 1,
1468                reconciliation_mode: CrdReconciliationMode::Authoritative,
1469                owned_roles: vec!["role-a".to_string()],
1470                owned_schemas: vec!["public".to_string()],
1471                managed_database_identity: "default/db/DATABASE_URL".to_string(),
1472            },
1473        );
1474        plan.metadata.namespace = Some("default".to_string());
1475        plan.metadata.annotations = annotations;
1476        plan.status = Some(PostgresPolicyPlanStatus {
1477            phase,
1478            ..Default::default()
1479        });
1480        plan
1481    }
1482
1483    #[test]
1484    fn check_plan_approval_pending_when_no_annotations() {
1485        let plan = test_plan("plan-1", PlanPhase::Pending, None);
1486        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1487    }
1488
1489    #[test]
1490    fn check_plan_approval_approved_with_annotation() {
1491        let annotations =
1492            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1493        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1494        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1495    }
1496
1497    #[test]
1498    fn check_plan_approval_rejected_with_annotation() {
1499        let annotations =
1500            BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1501        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1502        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1503    }
1504
1505    #[test]
1506    fn check_plan_approval_rejected_wins_over_approved() {
1507        let annotations = BTreeMap::from([
1508            (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1509            (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1510        ]);
1511        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1512        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1513    }
1514
1515    #[test]
1516    fn check_plan_approval_non_true_value_is_pending() {
1517        let annotations =
1518            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1519        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1520        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1521    }
1522
1523    #[test]
1524    fn compute_sql_hash_is_deterministic() {
1525        let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1526        let hash1 = compute_sql_hash(sql);
1527        let hash2 = compute_sql_hash(sql);
1528        assert_eq!(hash1, hash2);
1529        assert_eq!(hash1.len(), 64); // SHA-256 hex digest is 64 chars
1530    }
1531
1532    #[test]
1533    fn compute_sql_hash_differs_for_different_sql() {
1534        let hash1 = compute_sql_hash("CREATE ROLE a;");
1535        let hash2 = compute_sql_hash("CREATE ROLE b;");
1536        assert_ne!(hash1, hash2);
1537    }
1538
1539    #[test]
1540    fn compute_sql_hash_matches_pinned_fixture() {
1541        assert_eq!(
1542            compute_sql_hash("CREATE ROLE app LOGIN;"),
1543            "12a9743285d98ce73cfa9c840e943fc627d1fcbce22c5206fda1b21c84c1ac9c"
1544        );
1545    }
1546
1547    #[test]
1548    fn generate_plan_name_has_expected_format() {
1549        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1550        let name = generate_plan_name("my-policy", hash);
1551        assert!(name.starts_with("my-policy-plan-"));
1552        assert!(name.ends_with("-abcdef012345"));
1553        let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1554        // YYYYMMDD-HHMMSS-hashprefix = 15 + 1 + 12 = 28 chars
1555        assert_eq!(suffix.len(), 28);
1556        assert_eq!(&suffix[8..9], "-");
1557        assert_eq!(&suffix[15..16], "-");
1558    }
1559
1560    #[test]
1561    fn generate_plan_name_is_idempotent_for_same_hash_in_same_second() {
1562        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1563        let name1 = generate_plan_name("my-policy", hash);
1564        let name2 = generate_plan_name("my-policy", hash);
1565        assert_eq!(name1, name2);
1566    }
1567
1568    #[test]
1569    fn generate_plan_name_truncates_on_utf8_boundary() {
1570        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1571        let name = generate_plan_name(&"é".repeat(140), hash);
1572        assert!(name.len() <= 249);
1573        assert!(name.ends_with("-abcdef012345"));
1574    }
1575
1576    #[test]
1577    fn plan_label_value_is_stable_and_label_safe_for_long_names() {
1578        let plan_name = "very-long-policy-name-".repeat(20);
1579        let label = plan_label_value(&plan_name);
1580        assert_eq!(label, plan_label_value(&plan_name));
1581        assert_eq!(label.len(), 32);
1582        assert!(label.chars().all(|ch| ch.is_ascii_hexdigit()));
1583    }
1584
1585    #[test]
1586    fn existing_non_pending_plan_status_is_not_repatched_on_create_conflict() {
1587        let approved = test_plan("plan-1", PlanPhase::Approved, None);
1588        let applying = test_plan("plan-1", PlanPhase::Applying, None);
1589        let applied = test_plan("plan-1", PlanPhase::Applied, None);
1590
1591        assert!(!should_patch_existing_plan_status(&approved));
1592        assert!(!should_patch_existing_plan_status(&applying));
1593        assert!(!should_patch_existing_plan_status(&applied));
1594    }
1595
1596    #[test]
1597    fn existing_pending_or_statusless_plan_can_be_patched_on_create_conflict() {
1598        let pending = test_plan("plan-1", PlanPhase::Pending, None);
1599        let mut statusless = pending.clone();
1600        statusless.status = None;
1601
1602        assert!(should_patch_existing_plan_status(&pending));
1603        assert!(should_patch_existing_plan_status(&statusless));
1604    }
1605
1606    #[test]
1607    fn prepare_plan_sql_keeps_small_sql_inline() {
1608        let prepared = prepare_plan_sql("plan-1", "CREATE ROLE app LOGIN;").unwrap();
1609
1610        assert!(matches!(prepared.artifact, PlanSqlArtifact::Inline(_)));
1611        assert_eq!(
1612            prepared.sql_inline(),
1613            Some("CREATE ROLE app LOGIN;".to_string())
1614        );
1615        assert!(prepared.sql_ref().is_none());
1616        assert!(!prepared.is_truncated());
1617    }
1618
1619    #[test]
1620    fn prepare_plan_sql_compresses_large_brownfield_sized_sql() {
1621        let sql = brownfield_sized_sql();
1622        assert!(sql.len() > 1_048_576);
1623
1624        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1625
1626        let PlanSqlArtifact::CompressedConfigMap {
1627            key,
1628            compressed_sql,
1629            ..
1630        } = &prepared.artifact
1631        else {
1632            panic!("expected compressed ConfigMap artifact");
1633        };
1634        assert_eq!(key, SQL_CONFIGMAP_GZIP_KEY);
1635        assert!(compressed_sql.len() < MAX_CONFIGMAP_SQL_BYTES);
1636        assert_eq!(gunzip(compressed_sql), sql);
1637        assert_eq!(
1638            prepared.sql_ref().unwrap().compression,
1639            Some(SqlCompression::Gzip)
1640        );
1641        assert_eq!(prepared.original_bytes, sql.len());
1642        assert_eq!(prepared.stored_bytes, compressed_sql.len());
1643    }
1644
1645    #[test]
1646    fn configmap_binary_data_serializes_with_one_base64_layer() {
1647        let sql = brownfield_sized_sql();
1648        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1649        let PlanSqlArtifact::CompressedConfigMap {
1650            key,
1651            compressed_sql,
1652            ..
1653        } = &prepared.artifact
1654        else {
1655            panic!("expected compressed ConfigMap artifact");
1656        };
1657        let configmap = ConfigMap {
1658            binary_data: Some(BTreeMap::from([(
1659                key.clone(),
1660                ByteString(compressed_sql.clone()),
1661            )])),
1662            ..Default::default()
1663        };
1664
1665        let encoded = serde_json::to_value(&configmap).unwrap()["binaryData"][key]
1666            .as_str()
1667            .unwrap()
1668            .to_string();
1669        let decoded = base64::engine::general_purpose::STANDARD
1670            .decode(encoded)
1671            .unwrap();
1672
1673        assert_eq!(decoded, *compressed_sql);
1674        assert_eq!(gunzip(&decoded), sql);
1675    }
1676
1677    #[test]
1678    fn prepare_plan_sql_truncates_when_compressed_sql_is_still_too_large() {
1679        let sql = deterministic_incompressible_sql(1_400_000);
1680        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1681
1682        let PlanSqlArtifact::TruncatedInline(preview) = &prepared.artifact else {
1683            panic!("expected truncated inline artifact");
1684        };
1685        assert!(preview.len() <= MAX_INLINE_SQL_BYTES);
1686        assert!(preview.contains("truncated"));
1687        assert!(prepared.sql_ref().is_none());
1688        assert!(prepared.is_truncated());
1689    }
1690
1691    #[test]
1692    fn sanitize_label_value_replaces_slashes() {
1693        let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1694        assert!(!sanitized.contains('/'));
1695        assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1696    }
1697
1698    #[test]
1699    fn sanitize_label_value_truncates_to_63_chars() {
1700        let long_value = "a".repeat(100);
1701        let sanitized = sanitize_label_value(&long_value);
1702        assert!(sanitized.len() <= 63);
1703    }
1704
1705    #[test]
1706    fn stale_policy_sql_configmap_without_plan_label_is_orphan() {
1707        let configmap = ConfigMap {
1708            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1709                labels: Some(BTreeMap::from([(
1710                    LABEL_POLICY.to_string(),
1711                    sanitize_label_value("test-policy"),
1712                )])),
1713                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1714                    jiff::Timestamp::from_second(0).unwrap(),
1715                )),
1716                ..Default::default()
1717            },
1718            ..Default::default()
1719        };
1720
1721        assert!(is_orphan_sql_configmap(
1722            &configmap,
1723            &BTreeSet::new(),
1724            &BTreeSet::new(),
1725            ORPHAN_GRACE_SECS + 1
1726        ));
1727    }
1728
1729    #[test]
1730    fn stale_policy_sql_configmap_with_current_plan_name_is_not_orphan() {
1731        let plan_name = "test-policy-plan-20260506-000000-abcdef012345";
1732        let configmap = ConfigMap {
1733            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1734                name: Some(format!("{plan_name}-sql")),
1735                labels: Some(BTreeMap::from([
1736                    (
1737                        LABEL_POLICY.to_string(),
1738                        sanitize_label_value("test-policy"),
1739                    ),
1740                    (
1741                        LABEL_PLAN.to_string(),
1742                        sanitize_label_value("legacy-colliding-label"),
1743                    ),
1744                ])),
1745                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1746                    jiff::Timestamp::from_second(0).unwrap(),
1747                )),
1748                ..Default::default()
1749            },
1750            ..Default::default()
1751        };
1752
1753        assert!(!is_orphan_sql_configmap(
1754            &configmap,
1755            &BTreeSet::from([plan_name.to_string()]),
1756            &BTreeSet::new(),
1757            ORPHAN_GRACE_SECS + 1
1758        ));
1759    }
1760
1761    #[test]
1762    fn stale_policy_sql_configmap_with_known_hash_plan_label_is_not_orphan() {
1763        let plan_name = "test-policy-plan-20260506-000000-abcdef012345";
1764        let plan_label = plan_label_value(plan_name);
1765        let configmap = ConfigMap {
1766            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1767                name: Some("different-plan-sql".to_string()),
1768                labels: Some(BTreeMap::from([
1769                    (
1770                        LABEL_POLICY.to_string(),
1771                        sanitize_label_value("test-policy"),
1772                    ),
1773                    (LABEL_PLAN.to_string(), plan_label.clone()),
1774                ])),
1775                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1776                    jiff::Timestamp::from_second(0).unwrap(),
1777                )),
1778                ..Default::default()
1779            },
1780            ..Default::default()
1781        };
1782
1783        assert!(!is_orphan_sql_configmap(
1784            &configmap,
1785            &BTreeSet::new(),
1786            &BTreeSet::from([plan_label]),
1787            ORPHAN_GRACE_SECS + 1
1788        ));
1789    }
1790
1791    #[test]
1792    fn stale_policy_sql_configmap_with_only_legacy_colliding_label_is_orphan() {
1793        let plan_name =
1794            "very-long-policy-name-that-would-have-collided-plan-20260506-000000-abcdef012345";
1795        let legacy_label = sanitize_label_value(plan_name);
1796        let configmap = ConfigMap {
1797            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1798                name: Some("deleted-historical-plan-sql".to_string()),
1799                labels: Some(BTreeMap::from([
1800                    (
1801                        LABEL_POLICY.to_string(),
1802                        sanitize_label_value("test-policy"),
1803                    ),
1804                    (LABEL_PLAN.to_string(), legacy_label.clone()),
1805                ])),
1806                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1807                    jiff::Timestamp::from_second(0).unwrap(),
1808                )),
1809                ..Default::default()
1810            },
1811            ..Default::default()
1812        };
1813
1814        assert!(is_orphan_sql_configmap(
1815            &configmap,
1816            &BTreeSet::new(),
1817            &BTreeSet::new(),
1818            ORPHAN_GRACE_SECS + 1
1819        ));
1820    }
1821
1822    #[test]
1823    fn render_redacted_sql_masks_passwords() {
1824        let changes = vec![
1825            pgroles_core::diff::Change::CreateRole {
1826                name: "app".to_string(),
1827                state: pgroles_core::model::RoleState {
1828                    login: true,
1829                    ..pgroles_core::model::RoleState::default()
1830                },
1831            },
1832            pgroles_core::diff::Change::SetPassword {
1833                name: "app".to_string(),
1834                password: "super_secret".to_string(),
1835            },
1836        ];
1837        let ctx = pgroles_core::sql::SqlContext::default();
1838        let redacted = render_redacted_sql(&changes, &ctx);
1839
1840        assert!(redacted.contains("[REDACTED]"));
1841        assert!(!redacted.contains("super_secret"));
1842        assert!(redacted.contains("CREATE ROLE"));
1843    }
1844
1845    #[test]
1846    fn render_full_sql_includes_passwords() {
1847        let changes = vec![pgroles_core::diff::Change::SetPassword {
1848            name: "app".to_string(),
1849            password: "super_secret".to_string(),
1850        }];
1851        let ctx = pgroles_core::sql::SqlContext::default();
1852        let full = render_full_sql(&changes, &ctx);
1853
1854        assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1855    }
1856
1857    #[test]
1858    fn now_epoch_secs_returns_plausible_value() {
1859        let now = now_epoch_secs();
1860        // Should be after 2025-01-01 and before 2100-01-01.
1861        let y2025 = 1_735_689_600_i64;
1862        let y2100 = 4_102_444_800_i64;
1863        assert!(
1864            now > y2025 && now < y2100,
1865            "epoch secs {now} should be between 2025 and 2100"
1866        );
1867    }
1868
1869    fn brownfield_sized_sql() -> String {
1870        let mut sql = String::new();
1871        for schema in 0..33 {
1872            for profile in ["reader", "writer", "owner", "cdc"] {
1873                let role = format!("schema_{schema}_{profile}");
1874                sql.push_str(&format!(
1875                    "CREATE ROLE \"{role}\" LOGIN;\nCOMMENT ON ROLE \"{role}\" IS 'Generated from profile {profile} for brownfield migration schema {schema} with cdc ownership directives and review metadata';\n"
1876                ));
1877                for relkind in ["TABLES", "SEQUENCES", "FUNCTIONS"] {
1878                    sql.push_str(&format!(
1879                        "GRANT SELECT ON ALL {relkind} IN SCHEMA \"schema_{schema}\" TO \"{role}\";\n"
1880                    ));
1881                }
1882                for owner in 0..20 {
1883                    sql.push_str(&format!(
1884                        "ALTER DEFAULT PRIVILEGES FOR ROLE \"owner_{owner}\" IN SCHEMA \"schema_{schema}\" GRANT SELECT ON TABLES TO \"{role}\";\n"
1885                    ));
1886                }
1887            }
1888        }
1889        for member in 0..70 {
1890            sql.push_str(&format!(
1891                "GRANT \"group_{member}\" TO \"service_login_{}\";\n",
1892                member % 20
1893            ));
1894        }
1895        while sql.len() <= 1_100_000 {
1896            sql.push_str("-- brownfield migration padding for large plan regression\n");
1897        }
1898        sql
1899    }
1900
1901    fn deterministic_incompressible_sql(target_bytes: usize) -> String {
1902        let mut state = 0x1234_5678_u64;
1903        let mut sql = String::with_capacity(target_bytes);
1904        while sql.len() < target_bytes {
1905            state ^= state << 13;
1906            state ^= state >> 7;
1907            state ^= state << 17;
1908            let value = (state % 62) as u8;
1909            let ch = match value {
1910                0..=9 => b'0' + value,
1911                10..=35 => b'a' + (value - 10),
1912                _ => b'A' + (value - 36),
1913            };
1914            sql.push(ch as char);
1915            if sql.len().is_multiple_of(120) {
1916                sql.push('\n');
1917            }
1918        }
1919        sql
1920    }
1921
1922    fn gunzip(bytes: &[u8]) -> String {
1923        let mut decoder = GzDecoder::new(bytes);
1924        let mut decoded = String::new();
1925        decoder.read_to_string(&mut decoded).unwrap();
1926        decoded
1927    }
1928}