Skip to main content

pgroles_operator/
plan.rs

1//! Plan lifecycle management for `PostgresPolicyPlan` resources.
2//!
3//! Handles creating, deduplicating, approving, executing, and cleaning up
4//! reconciliation plans. Plans represent computed SQL change sets that may
5//! require explicit approval before execution against a database.
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::io::Write;
9use std::time::Duration;
10
11use flate2::Compression;
12use flate2::write::GzEncoder;
13use k8s_openapi::ByteString;
14use k8s_openapi::api::core::v1::ConfigMap;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
16use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
17use kube::{Client, Resource, ResourceExt};
18use sha2::{Digest, Sha256};
19use tracing::info;
20
21use crate::crd::{
22    ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_PLAN, LABEL_POLICY,
23    PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
24    PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
25    PostgresPolicyPlanStatus, SqlCompression, SqlRef,
26};
27use crate::reconciler::ReconcileError;
28
29/// Result of plan creation — distinguishes genuinely new plans from
30/// deduplication hits so callers can decide whether to emit events.
31#[derive(Debug, Clone)]
32pub enum PlanCreationResult {
33    /// A new plan was created with the given name.
34    Created(String),
35    /// An existing plan with the same hash was found (deduplication).
36    Deduplicated(String),
37}
38
39impl PlanCreationResult {
40    /// Return the plan name regardless of variant.
41    pub fn plan_name(&self) -> &str {
42        match self {
43            PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
44        }
45    }
46
47    /// True when a new plan was actually created.
48    pub fn is_created(&self) -> bool {
49        matches!(self, PlanCreationResult::Created(_))
50    }
51}
52
53/// Maximum inline SQL size in plan status before spilling to a ConfigMap.
54const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
55
56/// ConfigMap binaryData key for gzip-compressed SQL content.
57const SQL_CONFIGMAP_GZIP_KEY: &str = "plan.sql.gz";
58
59/// Conservative stored-byte ceiling for SQL ConfigMaps. Kubernetes caps
60/// ConfigMap data at 1 MiB; this leaves room for metadata and future labels.
61const MAX_CONFIGMAP_SQL_BYTES: usize = 900 * 1024;
62
63/// Stale status-less plan and orphan ConfigMap grace period.
64const ORPHAN_GRACE_SECS: i64 = 60;
65
66/// Best-effort cleanup should never block a fresh reconcile for long.
67const CLEANUP_TIMEOUT_SECS: u64 = 5;
68
69/// Default maximum number of historical plans to retain per policy.
70const DEFAULT_MAX_PLANS: usize = 10;
71
72/// How recently a Failed plan must have been created (in seconds) for the
73/// dedup check to consider it a match. Plans older than this are ignored so
74/// that retries after the user fixes the environment are not blocked.
75const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78enum PlanSqlArtifact {
79    Inline(String),
80    CompressedConfigMap {
81        configmap_name: String,
82        key: String,
83        compressed_sql: Vec<u8>,
84    },
85    TruncatedInline(String),
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89struct PreparedPlanSql {
90    artifact: PlanSqlArtifact,
91    redacted_sql_hash: String,
92    original_bytes: usize,
93    stored_bytes: usize,
94}
95
96impl PreparedPlanSql {
97    fn sql_ref(&self) -> Option<SqlRef> {
98        match &self.artifact {
99            PlanSqlArtifact::CompressedConfigMap {
100                configmap_name,
101                key,
102                ..
103            } => Some(SqlRef {
104                name: configmap_name.clone(),
105                key: key.clone(),
106                compression: Some(SqlCompression::Gzip),
107            }),
108            PlanSqlArtifact::Inline(_) | PlanSqlArtifact::TruncatedInline(_) => None,
109        }
110    }
111
112    fn sql_inline(&self) -> Option<String> {
113        match &self.artifact {
114            PlanSqlArtifact::Inline(sql) | PlanSqlArtifact::TruncatedInline(sql) => {
115                Some(sql.clone())
116            }
117            PlanSqlArtifact::CompressedConfigMap { .. } => None,
118        }
119    }
120
121    fn is_truncated(&self) -> bool {
122        matches!(self.artifact, PlanSqlArtifact::TruncatedInline(_))
123    }
124}
125
126// ---------------------------------------------------------------------------
127// Plan approval check
128// ---------------------------------------------------------------------------
129
130/// Result of checking a plan's approval annotations.
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub enum PlanApprovalState {
133    Pending,
134    Approved,
135    Rejected,
136}
137
138/// Check the approval state of a plan by inspecting its annotations.
139///
140/// Rejection takes priority over approval: if both annotations are set,
141/// the plan is considered rejected.
142pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
143    let annotations = plan.metadata.annotations.as_ref();
144
145    let rejected = annotations
146        .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
147        .map(|v| v == "true")
148        .unwrap_or(false);
149
150    if rejected {
151        return PlanApprovalState::Rejected;
152    }
153
154    let approved = annotations
155        .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
156        .map(|v| v == "true")
157        .unwrap_or(false);
158
159    if approved {
160        return PlanApprovalState::Approved;
161    }
162
163    PlanApprovalState::Pending
164}
165
166// ---------------------------------------------------------------------------
167// Plan creation
168// ---------------------------------------------------------------------------
169
170/// Create or deduplicate a `PostgresPolicyPlan` for the given policy and changes.
171///
172/// Returns the name of the plan resource (either existing or newly created).
173///
174/// This function:
175/// 1. Renders the full executable SQL from the changes
176/// 2. Computes SHA-256 of the full SQL (before any redaction/truncation)
177/// 3. Checks for an existing Pending plan with the same hash (dedup)
178/// 4. Persists the SQL preview artifact, if needed
179/// 5. Creates the new plan resource with ownerReferences
180/// 6. Updates the plan status
181/// 7. Marks any older Pending plans with a different hash as Superseded
182#[allow(clippy::too_many_arguments)]
183pub async fn create_or_update_plan(
184    client: &Client,
185    policy: &PostgresPolicy,
186    changes: &[pgroles_core::diff::Change],
187    sql_context: &pgroles_core::sql::SqlContext,
188    inspect_config: &pgroles_inspect::InspectConfig,
189    reconciliation_mode: CrdReconciliationMode,
190    database_identity: &str,
191    change_summary: &ChangeSummary,
192) -> Result<PlanCreationResult, ReconcileError> {
193    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
194    let policy_name = policy.name_any();
195    let generation = policy.metadata.generation.unwrap_or(0);
196
197    // 1. Render the full executable SQL (not redacted).
198    let full_sql = render_full_sql(changes, sql_context);
199
200    // 2. Compute SHA-256 hash of the full SQL.
201    let sql_hash = compute_sql_hash(&full_sql);
202
203    // 3. Count SQL statements (after wildcard expansion).
204    let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
205
206    // 4. Render redacted SQL for display (passwords masked).
207    let redacted_sql = render_redacted_sql(changes, sql_context);
208
209    cleanup_old_plans_best_effort(client, policy, None).await;
210
211    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
212
213    // 4. List existing plans for this policy.
214    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
215    let existing_plans = plans_api
216        .list(&ListParams::default().labels(&label_selector))
217        .await?;
218
219    // 5. Check for duplicate pending plan with same hash.
220    for plan in &existing_plans {
221        if let Some(ref status) = plan.status
222            && status.phase == PlanPhase::Pending
223            && status.sql_hash.as_deref() == Some(&sql_hash)
224        {
225            // Identical plan already exists — return early (deduplicated).
226            let plan_name = plan.name_any();
227            info!(
228                plan = %plan_name,
229                policy = %policy_name,
230                "existing pending plan has identical SQL hash, skipping creation"
231            );
232            return Ok(PlanCreationResult::Deduplicated(plan_name));
233        }
234    }
235
236    // 5b. Check for recently-failed plan with the same hash. If a plan with
237    // this exact SQL already failed within the dedup window, creating another
238    // identical one is pointless — it would produce the same error. The window
239    // ensures we don't block retries after the user fixes the environment.
240    //
241    // Uses `status.failed_at` (not `creation_timestamp`) so that plans which
242    // waited for approval before failing are measured from the failure time.
243    let now_ts = now_epoch_secs();
244    for plan in &existing_plans {
245        if let Some(ref status) = plan.status
246            && status.phase == PlanPhase::Failed
247            && status.sql_hash.as_deref() == Some(&sql_hash)
248        {
249            let failed_ts = status
250                .failed_at
251                .as_deref()
252                .and_then(parse_rfc3339_epoch_secs)
253                .unwrap_or(0);
254            if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
255                let plan_name = plan.name_any();
256                info!(
257                    plan = %plan_name,
258                    policy = %policy_name,
259                    age_secs = now_ts - failed_ts,
260                    "recently-failed plan has identical SQL hash, skipping creation"
261                );
262                return Ok(PlanCreationResult::Deduplicated(plan_name));
263            }
264        }
265    }
266
267    // 6. Generate a plan name using timestamp plus SQL hash. The hash suffix
268    // makes same-second retries after content persistence failures idempotent.
269    let plan_name = generate_plan_name(&policy_name, &sql_hash);
270    let prepared_sql = prepare_plan_sql(&plan_name, &redacted_sql)?;
271
272    // 7. Persist SQL content before materialising the visible plan resource.
273    let sql_configmap_name = create_plan_sql_configmap(
274        client,
275        policy,
276        &namespace,
277        &policy_name,
278        database_identity,
279        &prepared_sql,
280    )
281    .await?;
282
283    // 8. Build ownerReference pointing to the parent policy.
284    let owner_ref = build_owner_reference(policy);
285
286    // 9. Create the plan resource.
287    let plan = PostgresPolicyPlan::new(
288        &plan_name,
289        PostgresPolicyPlanSpec {
290            policy_ref: PolicyPlanRef {
291                name: policy_name.clone(),
292            },
293            policy_generation: generation,
294            reconciliation_mode,
295            owned_roles: inspect_config.managed_roles.clone(),
296            owned_schemas: inspect_config.managed_schemas.clone(),
297            managed_database_identity: database_identity.to_string(),
298        },
299    );
300    let mut plan = plan;
301    plan.metadata.namespace = Some(namespace.clone());
302    plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
303    plan.metadata.labels = Some(BTreeMap::from([
304        (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
305        (
306            LABEL_DATABASE_IDENTITY.to_string(),
307            sanitize_label_value(database_identity),
308        ),
309    ]));
310
311    // Annotations for quick visibility in kubectl describe / Lens.
312    let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
313    let summary_text = format!(
314        "{}R {}G {}D {}DP {}M",
315        change_summary.roles_created + change_summary.roles_altered,
316        change_summary.grants_added,
317        change_summary.default_privileges_set,
318        change_summary.roles_dropped,
319        change_summary.members_added,
320    );
321    plan.metadata.annotations = Some(BTreeMap::from([
322        ("pgroles.io/sql-preview".to_string(), sql_preview),
323        ("pgroles.io/summary".to_string(), summary_text),
324        (
325            "pgroles.io/sql-hash".to_string(),
326            sql_hash[..12].to_string(),
327        ),
328        (
329            "pgroles.io/redacted-sql-hash".to_string(),
330            prepared_sql.redacted_sql_hash[..12].to_string(),
331        ),
332        (
333            "pgroles.io/sql-original-bytes".to_string(),
334            prepared_sql.original_bytes.to_string(),
335        ),
336        (
337            "pgroles.io/sql-stored-bytes".to_string(),
338            prepared_sql.stored_bytes.to_string(),
339        ),
340    ]));
341
342    let (created_plan, created_new_plan) =
343        match plans_api.create(&PostParams::default(), &plan).await {
344            Ok(plan) => (plan, true),
345            Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
346                let existing = plans_api.get(&plan_name).await?;
347                (existing, false)
348            }
349            Err(err) => {
350                if let Some(configmap_name) = sql_configmap_name.as_deref() {
351                    delete_configmap_best_effort(client, &namespace, configmap_name).await;
352                }
353                return Err(err.into());
354            }
355        };
356    let plan_name = created_plan.name_any();
357
358    // 11. Update plan status.
359    let computed_message = if prepared_sql.is_truncated() {
360        format!(
361            "Plan computed with {} change(s); SQL preview truncated because compressed SQL exceeded Kubernetes ConfigMap limits",
362            change_summary.total
363        )
364    } else {
365        format!("Plan computed with {} change(s)", change_summary.total)
366    };
367    let plan_status = PostgresPolicyPlanStatus {
368        phase: PlanPhase::Pending,
369        conditions: vec![
370            PolicyCondition {
371                condition_type: "Computed".to_string(),
372                status: "True".to_string(),
373                reason: Some("PlanComputed".to_string()),
374                message: Some(computed_message),
375                last_transition_time: Some(crate::crd::now_rfc3339()),
376            },
377            PolicyCondition {
378                condition_type: "Approved".to_string(),
379                status: "False".to_string(),
380                reason: Some("PendingApproval".to_string()),
381                message: Some("Plan awaiting approval".to_string()),
382                last_transition_time: Some(crate::crd::now_rfc3339()),
383            },
384        ],
385        change_summary: Some(change_summary.clone()),
386        sql_ref: prepared_sql.sql_ref(),
387        sql_inline: prepared_sql.sql_inline(),
388        sql_truncated: prepared_sql.is_truncated(),
389        computed_at: Some(crate::crd::now_rfc3339()),
390        applied_at: None,
391        last_error: None,
392        sql_hash: Some(sql_hash),
393        applying_since: None,
394        failed_at: None,
395        sql_statements: Some(sql_statement_count),
396        redacted_sql_hash: Some(prepared_sql.redacted_sql_hash.clone()),
397        sql_original_bytes: Some(prepared_sql.original_bytes as i64),
398        sql_stored_bytes: Some(prepared_sql.stored_bytes as i64),
399    };
400
401    let status_patch = serde_json::json!({ "status": plan_status });
402    if let Err(err) = plans_api
403        .patch_status(
404            &plan_name,
405            &PatchParams::apply("pgroles-operator"),
406            &Patch::Merge(&status_patch),
407        )
408        .await
409    {
410        if created_new_plan {
411            delete_plan_best_effort(&plans_api, &plan_name).await;
412        }
413        if let Some(configmap_name) = sql_configmap_name.as_deref() {
414            delete_configmap_best_effort(client, &namespace, configmap_name).await;
415        }
416        return Err(err.into());
417    }
418
419    // 12. Mark any existing Pending plans as Superseded after the new plan is
420    // fully visible. This avoids losing the current actionable plan if SQL
421    // persistence fails before the replacement is materialised.
422    for plan in &existing_plans {
423        if let Some(ref status) = plan.status
424            && status.phase == PlanPhase::Pending
425            && plan.name_any() != plan_name
426        {
427            let old_plan_name = plan.name_any();
428            info!(
429                plan = %old_plan_name,
430                policy = %policy_name,
431                "marking existing pending plan as Superseded"
432            );
433            let superseded_status = PostgresPolicyPlanStatus {
434                phase: PlanPhase::Superseded,
435                ..status.clone()
436            };
437            let patch = serde_json::json!({ "status": superseded_status });
438            plans_api
439                .patch_status(
440                    &old_plan_name,
441                    &PatchParams::apply("pgroles-operator"),
442                    &Patch::Merge(&patch),
443                )
444                .await?;
445        }
446    }
447
448    info!(
449        plan = %plan_name,
450        policy = %policy_name,
451        changes = change_summary.total,
452        "created new plan"
453    );
454
455    Ok(PlanCreationResult::Created(plan_name))
456}
457
458// ---------------------------------------------------------------------------
459// Plan execution
460// ---------------------------------------------------------------------------
461
462/// Execute an approved plan against the database.
463///
464/// Re-renders executable SQL from the reconciler's in-memory changes, executes
465/// it in a transaction, and updates the plan status to Applied or Failed.
466/// Persisted SQL on the plan is a redacted review artifact only; apply must not
467/// read it because large plans may store only a truncated preview.
468pub async fn execute_plan(
469    client: &Client,
470    plan: &PostgresPolicyPlan,
471    pool: &sqlx::PgPool,
472    sql_context: &pgroles_core::sql::SqlContext,
473    changes: &[pgroles_core::diff::Change],
474) -> Result<(), ReconcileError> {
475    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
476    let plan_name = plan.name_any();
477    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
478
479    // Update phase to Applying.
480    update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
481
482    // Execute the SQL in a transaction using the original changes (not stored SQL).
483    // This ensures we use the actual executable SQL including real passwords,
484    // not the redacted version stored in the plan.
485    let result = execute_changes_in_transaction(pool, changes, sql_context).await;
486
487    match result {
488        Ok(statements_executed) => {
489            // Update plan status to Applied.
490            let mut applied_status = plan.status.clone().unwrap_or_default();
491            applied_status.phase = PlanPhase::Applied;
492            applied_status.applied_at = Some(crate::crd::now_rfc3339());
493            applied_status.last_error = None;
494            set_plan_condition(
495                &mut applied_status.conditions,
496                "Approved",
497                "True",
498                "Approved",
499                "Plan approved and executed",
500            );
501
502            let patch = serde_json::json!({ "status": applied_status });
503            plans_api
504                .patch_status(
505                    &plan_name,
506                    &PatchParams::apply("pgroles-operator"),
507                    &Patch::Merge(&patch),
508                )
509                .await?;
510
511            info!(
512                plan = %plan_name,
513                statements = statements_executed,
514                "plan executed successfully"
515            );
516            Ok(())
517        }
518        Err(err) => {
519            // Update plan status to Failed.
520            let error_message = err.to_string();
521            let mut failed_status = plan.status.clone().unwrap_or_default();
522            failed_status.phase = PlanPhase::Failed;
523            failed_status.last_error = Some(error_message);
524            failed_status.failed_at = Some(crate::crd::now_rfc3339());
525
526            let patch = serde_json::json!({ "status": failed_status });
527            if let Err(status_err) = plans_api
528                .patch_status(
529                    &plan_name,
530                    &PatchParams::apply("pgroles-operator"),
531                    &Patch::Merge(&patch),
532                )
533                .await
534            {
535                tracing::warn!(
536                    plan = %plan_name,
537                    %status_err,
538                    "failed to update plan status to Failed"
539                );
540            }
541
542            Err(err)
543        }
544    }
545}
546
547fn prepare_plan_sql(
548    plan_name: &str,
549    redacted_sql: &str,
550) -> Result<PreparedPlanSql, ReconcileError> {
551    let original_bytes = redacted_sql.len();
552    let redacted_sql_hash = compute_sql_hash(redacted_sql);
553
554    if original_bytes <= MAX_INLINE_SQL_BYTES {
555        return Ok(PreparedPlanSql {
556            artifact: PlanSqlArtifact::Inline(redacted_sql.to_string()),
557            redacted_sql_hash,
558            original_bytes,
559            stored_bytes: original_bytes,
560        });
561    }
562
563    let compressed_sql = gzip_bytes(redacted_sql.as_bytes())?;
564    if compressed_sql.len() <= MAX_CONFIGMAP_SQL_BYTES {
565        let stored_bytes = compressed_sql.len();
566        return Ok(PreparedPlanSql {
567            artifact: PlanSqlArtifact::CompressedConfigMap {
568                configmap_name: format!("{plan_name}-sql"),
569                key: SQL_CONFIGMAP_GZIP_KEY.to_string(),
570                compressed_sql,
571            },
572            redacted_sql_hash,
573            original_bytes,
574            stored_bytes,
575        });
576    }
577
578    let truncated = truncate_utf8(
579        redacted_sql,
580        MAX_INLINE_SQL_BYTES,
581        "\n-- truncated: compressed SQL preview exceeded Kubernetes ConfigMap limits --",
582    );
583    let stored_bytes = truncated.len();
584    Ok(PreparedPlanSql {
585        artifact: PlanSqlArtifact::TruncatedInline(truncated),
586        redacted_sql_hash,
587        original_bytes,
588        stored_bytes,
589    })
590}
591
592fn gzip_bytes(bytes: &[u8]) -> Result<Vec<u8>, ReconcileError> {
593    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
594    encoder
595        .write_all(bytes)
596        .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))?;
597    encoder
598        .finish()
599        .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))
600}
601
602fn truncate_utf8(text: &str, max_bytes: usize, marker: &str) -> String {
603    if text.len() <= max_bytes {
604        return text.to_string();
605    }
606
607    let target_len = max_bytes.saturating_sub(marker.len());
608    let mut end = target_len.min(text.len());
609    while end > 0 && !text.is_char_boundary(end) {
610        end -= 1;
611    }
612
613    let mut truncated = text[..end].to_string();
614    truncated.push_str(marker);
615    truncated
616}
617
618async fn create_plan_sql_configmap(
619    client: &Client,
620    policy: &PostgresPolicy,
621    namespace: &str,
622    policy_name: &str,
623    database_identity: &str,
624    prepared_sql: &PreparedPlanSql,
625) -> Result<Option<String>, ReconcileError> {
626    let PlanSqlArtifact::CompressedConfigMap {
627        configmap_name,
628        key: _,
629        compressed_sql: _,
630    } = &prepared_sql.artifact
631    else {
632        return Ok(None);
633    };
634
635    let configmap = build_plan_sql_configmap_object(
636        policy,
637        namespace,
638        policy_name,
639        database_identity,
640        prepared_sql,
641    )?;
642
643    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
644    match configmaps_api
645        .create(&PostParams::default(), &configmap)
646        .await
647    {
648        Ok(_) => Ok(Some(configmap_name.clone())),
649        Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
650            let existing = configmaps_api.get(configmap_name).await?;
651            validate_existing_sql_configmap(&existing, prepared_sql)?;
652            Ok(Some(configmap_name.clone()))
653        }
654        Err(err) => Err(err.into()),
655    }
656}
657
658fn build_plan_sql_configmap_object(
659    policy: &PostgresPolicy,
660    namespace: &str,
661    policy_name: &str,
662    database_identity: &str,
663    prepared_sql: &PreparedPlanSql,
664) -> Result<ConfigMap, ReconcileError> {
665    let PlanSqlArtifact::CompressedConfigMap {
666        configmap_name,
667        key,
668        compressed_sql,
669    } = &prepared_sql.artifact
670    else {
671        return Err(ReconcileError::PlanSqlStorage(
672            "cannot build ConfigMap for inline plan SQL".to_string(),
673        ));
674    };
675
676    Ok(ConfigMap {
677        metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
678            name: Some(configmap_name.clone()),
679            namespace: Some(namespace.to_string()),
680            owner_references: Some(vec![build_owner_reference(policy)]),
681            labels: Some(BTreeMap::from([
682                (LABEL_POLICY.to_string(), sanitize_label_value(policy_name)),
683                (
684                    LABEL_DATABASE_IDENTITY.to_string(),
685                    sanitize_label_value(database_identity),
686                ),
687                (
688                    LABEL_PLAN.to_string(),
689                    sanitize_label_value(configmap_plan_name(configmap_name)),
690                ),
691            ])),
692            annotations: Some(BTreeMap::from([
693                ("pgroles.io/sql-compression".to_string(), "gzip".to_string()),
694                (
695                    "pgroles.io/redacted-sql-hash".to_string(),
696                    prepared_sql.redacted_sql_hash.clone(),
697                ),
698                (
699                    "pgroles.io/sql-original-bytes".to_string(),
700                    prepared_sql.original_bytes.to_string(),
701                ),
702                (
703                    "pgroles.io/sql-stored-bytes".to_string(),
704                    prepared_sql.stored_bytes.to_string(),
705                ),
706            ])),
707            ..Default::default()
708        },
709        binary_data: Some(BTreeMap::from([(
710            key.clone(),
711            ByteString(compressed_sql.clone()),
712        )])),
713        ..Default::default()
714    })
715}
716
717fn configmap_plan_name(configmap_name: &str) -> &str {
718    configmap_name
719        .strip_suffix("-sql")
720        .unwrap_or(configmap_name)
721}
722
723fn validate_existing_sql_configmap(
724    configmap: &ConfigMap,
725    prepared_sql: &PreparedPlanSql,
726) -> Result<(), ReconcileError> {
727    let Some(annotations) = configmap.metadata.annotations.as_ref() else {
728        return Err(ReconcileError::PlanSqlStorage(format!(
729            "existing ConfigMap {} is missing SQL storage annotations",
730            configmap.name_any()
731        )));
732    };
733    let hash_matches = annotations
734        .get("pgroles.io/redacted-sql-hash")
735        .map(|hash| hash == &prepared_sql.redacted_sql_hash)
736        .unwrap_or(false);
737    if hash_matches {
738        Ok(())
739    } else {
740        Err(ReconcileError::PlanSqlStorage(format!(
741            "existing ConfigMap {} does not match computed SQL preview hash",
742            configmap.name_any()
743        )))
744    }
745}
746
747/// Execute SQL changes in a database transaction.
748///
749/// Returns the number of statements executed on success.
750async fn execute_changes_in_transaction(
751    pool: &sqlx::PgPool,
752    changes: &[pgroles_core::diff::Change],
753    sql_context: &pgroles_core::sql::SqlContext,
754) -> Result<usize, ReconcileError> {
755    let mut transaction = pool.begin().await?;
756    let mut statements_executed = 0usize;
757
758    for change in changes {
759        let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
760        for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
761            if is_sensitive {
762                tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
763            } else {
764                tracing::debug!(%sql, "executing");
765            }
766            sqlx::query(&sql).execute(transaction.as_mut()).await?;
767            statements_executed += 1;
768        }
769    }
770
771    transaction.commit().await?;
772    Ok(statements_executed)
773}
774
775// ---------------------------------------------------------------------------
776// Plan cleanup / retention
777// ---------------------------------------------------------------------------
778
779/// Best-effort cleanup wrapper used on hot reconciliation paths. Cleanup should
780/// reduce leaked resources, never block otherwise valid reconciliation.
781pub async fn cleanup_old_plans_best_effort(
782    client: &Client,
783    policy: &PostgresPolicy,
784    max_plans: Option<usize>,
785) {
786    match tokio::time::timeout(
787        Duration::from_secs(CLEANUP_TIMEOUT_SECS),
788        cleanup_old_plans(client, policy, max_plans),
789    )
790    .await
791    {
792        Ok(Ok(())) => {}
793        Ok(Err(err)) => tracing::warn!(%err, "failed to clean up old plans"),
794        Err(_) => tracing::warn!(
795            timeout_secs = CLEANUP_TIMEOUT_SECS,
796            "timed out cleaning up old plans"
797        ),
798    }
799}
800
801/// Clean up old plans for a policy, retaining at most `max_plans` terminal plans.
802///
803/// Terminal plans are those in Applied, Failed, Superseded, or Rejected phase;
804/// Pending, Approved, and Applying plans are retained. Status-less plans and
805/// SQL ConfigMaps older than a short grace period are treated as stale orphans.
806pub async fn cleanup_old_plans(
807    client: &Client,
808    policy: &PostgresPolicy,
809    max_plans: Option<usize>,
810) -> Result<(), ReconcileError> {
811    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
812    let policy_name = policy.name_any();
813    let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
814
815    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
816    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
817    let existing_plans = plans_api
818        .list(&ListParams::default().labels(&label_selector))
819        .await?;
820    let now_ts = now_epoch_secs();
821
822    for plan in existing_plans
823        .iter()
824        .filter(|plan| is_stale_statusless_plan(plan, now_ts))
825    {
826        let plan_name = plan.name_any();
827        info!(
828            plan = %plan_name,
829            policy = %policy_name,
830            "cleaning up stale status-less plan"
831        );
832        if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
833            tracing::warn!(
834                plan = %plan_name,
835                %err,
836                "failed to delete stale status-less plan during cleanup"
837            );
838        }
839    }
840
841    // Collect terminal plans sorted by creation timestamp (oldest first).
842    let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
843        .iter()
844        .filter(|plan| {
845            plan.status
846                .as_ref()
847                .map(|s| {
848                    matches!(
849                        s.phase,
850                        PlanPhase::Applied
851                            | PlanPhase::Failed
852                            | PlanPhase::Superseded
853                            | PlanPhase::Rejected
854                    )
855                })
856                .unwrap_or(false)
857        })
858        .collect();
859
860    if terminal_plans.len() > max_plans {
861        // Sort by creation timestamp ascending (oldest first).
862        terminal_plans.sort_by(|a, b| {
863            let a_time = a.metadata.creation_timestamp.as_ref();
864            let b_time = b.metadata.creation_timestamp.as_ref();
865            a_time.cmp(&b_time)
866        });
867
868        let plans_to_delete = terminal_plans.len() - max_plans;
869        for plan in terminal_plans.into_iter().take(plans_to_delete) {
870            let plan_name = plan.name_any();
871            info!(
872                plan = %plan_name,
873                policy = %policy_name,
874                "cleaning up old plan"
875            );
876            if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
877                tracing::warn!(
878                    plan = %plan_name,
879                    %err,
880                    "failed to delete old plan during cleanup"
881                );
882            }
883        }
884    }
885
886    cleanup_orphan_sql_configmaps(
887        client,
888        &namespace,
889        &policy_name,
890        &existing_plans.items,
891        now_ts,
892    )
893    .await?;
894
895    Ok(())
896}
897
898// ---------------------------------------------------------------------------
899// Helpers
900// ---------------------------------------------------------------------------
901
902async fn cleanup_orphan_sql_configmaps(
903    client: &Client,
904    namespace: &str,
905    policy_name: &str,
906    existing_plans: &[PostgresPolicyPlan],
907    now_ts: i64,
908) -> Result<(), ReconcileError> {
909    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
910    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(policy_name));
911    let configmaps = configmaps_api
912        .list(&ListParams::default().labels(&label_selector))
913        .await?;
914    let known_plan_labels: BTreeSet<String> = existing_plans
915        .iter()
916        .map(|plan| sanitize_label_value(&plan.name_any()))
917        .collect();
918
919    for configmap in configmaps {
920        if !is_orphan_sql_configmap(&configmap, &known_plan_labels, now_ts) {
921            continue;
922        }
923
924        let configmap_name = configmap.name_any();
925        info!(
926            configmap = %configmap_name,
927            policy = %policy_name,
928            "cleaning up orphan plan SQL ConfigMap"
929        );
930        if let Err(err) = configmaps_api
931            .delete(&configmap_name, &DeleteParams::default())
932            .await
933        {
934            tracing::warn!(
935                configmap = %configmap_name,
936                %err,
937                "failed to delete orphan plan SQL ConfigMap during cleanup"
938            );
939        }
940    }
941
942    Ok(())
943}
944
945fn is_orphan_sql_configmap(
946    configmap: &ConfigMap,
947    known_plan_labels: &BTreeSet<String>,
948    now_ts: i64,
949) -> bool {
950    let Some(labels) = configmap.metadata.labels.as_ref() else {
951        return false;
952    };
953    if !labels.contains_key(LABEL_POLICY) || !is_stale_object(configmap, now_ts) {
954        return false;
955    }
956    labels
957        .get(LABEL_PLAN)
958        .map(|plan_label| !known_plan_labels.contains(plan_label))
959        .unwrap_or(true)
960}
961
962fn is_stale_statusless_plan(plan: &PostgresPolicyPlan, now_ts: i64) -> bool {
963    plan.status.is_none() && is_stale_object(plan, now_ts)
964}
965
966fn is_stale_object<K>(resource: &K, now_ts: i64) -> bool
967where
968    K: Resource,
969{
970    resource
971        .meta()
972        .creation_timestamp
973        .as_ref()
974        .map(|timestamp| now_ts.saturating_sub(timestamp.0.as_second()) > ORPHAN_GRACE_SECS)
975        .unwrap_or(false)
976}
977
978async fn delete_plan_best_effort(plans_api: &Api<PostgresPolicyPlan>, plan_name: &str) {
979    if let Err(err) = plans_api.delete(plan_name, &DeleteParams::default()).await {
980        tracing::warn!(
981            plan = %plan_name,
982            %err,
983            "failed to roll back plan after status update failure"
984        );
985    }
986}
987
988async fn delete_configmap_best_effort(client: &Client, namespace: &str, configmap_name: &str) {
989    let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
990    if let Err(err) = configmaps_api
991        .delete(configmap_name, &DeleteParams::default())
992        .await
993    {
994        tracing::warn!(
995            configmap = %configmap_name,
996            %err,
997            "failed to roll back plan SQL ConfigMap"
998        );
999    }
1000}
1001
1002/// Render the full executable SQL from changes (including real passwords).
1003pub(crate) fn render_full_sql(
1004    changes: &[pgroles_core::diff::Change],
1005    sql_context: &pgroles_core::sql::SqlContext,
1006) -> String {
1007    changes
1008        .iter()
1009        .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
1010        .collect::<Vec<_>>()
1011        .join("\n")
1012}
1013
1014/// Render redacted SQL for display (passwords replaced with [REDACTED]).
1015fn render_redacted_sql(
1016    changes: &[pgroles_core::diff::Change],
1017    sql_context: &pgroles_core::sql::SqlContext,
1018) -> String {
1019    changes
1020        .iter()
1021        .flat_map(|change| {
1022            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1023                vec![format!(
1024                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
1025                    pgroles_core::sql::quote_ident(name)
1026                )]
1027            } else {
1028                pgroles_core::sql::render_statements_with_context(change, sql_context)
1029            }
1030        })
1031        .collect::<Vec<_>>()
1032        .join("\n")
1033}
1034
1035/// Compute SHA-256 hash of the SQL string as a hex digest.
1036pub(crate) fn compute_sql_hash(sql: &str) -> String {
1037    use std::fmt::Write as _;
1038
1039    let mut hasher = Sha256::new();
1040    hasher.update(sql.as_bytes());
1041    let digest = hasher.finalize();
1042    let mut hex = String::with_capacity(digest.len() * 2);
1043    for byte in digest {
1044        write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
1045    }
1046    hex
1047}
1048
1049/// Generate a plan name from policy name, current timestamp, and SQL hash.
1050///
1051/// Format: `{policy-name}-plan-{YYYYMMDD-HHMMSS}-{hash-prefix}`
1052///
1053/// The hash suffix makes retries within the same second idempotent if SQL
1054/// content persistence succeeds but plan creation fails.
1055fn generate_plan_name(policy_name: &str, sql_hash: &str) -> String {
1056    let timestamp = format_timestamp_compact();
1057    let suffix = &sql_hash[..12.min(sql_hash.len())];
1058    // Kubernetes names must be <= 253 chars and DNS-compatible.
1059    // Reserve 4 chars for the potential "-sql" ConfigMap suffix.
1060    let max_name_len = 253 - 4; // 249
1061    let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
1062    let prefix = if policy_name.len() > max_prefix_len {
1063        policy_name
1064            .char_indices()
1065            .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_prefix_len)
1066            .map(|(_, ch)| ch)
1067            .collect::<String>()
1068    } else {
1069        policy_name.to_string()
1070    };
1071    format!("{prefix}-plan-{timestamp}-{suffix}")
1072}
1073
1074/// Format the current UTC time as `YYYYMMDD-HHMMSS`.
1075fn format_timestamp_compact() -> String {
1076    use std::time::SystemTime;
1077    let now = SystemTime::now()
1078        .duration_since(SystemTime::UNIX_EPOCH)
1079        .unwrap_or_default();
1080    let secs = now.as_secs();
1081    let (year, month, day) = crate::crd::days_to_date(secs / 86400);
1082    let remaining = secs % 86400;
1083    let hours = remaining / 3600;
1084    let minutes = (remaining % 3600) / 60;
1085    let seconds = remaining % 60;
1086    format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
1087}
1088
1089/// Sanitize a string for use as a Kubernetes label value.
1090///
1091/// Label values must be <= 63 chars and match `[a-z0-9A-Z._-]*`.
1092fn sanitize_label_value(value: &str) -> String {
1093    let sanitized: String = value
1094        .chars()
1095        .map(|c| {
1096            if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
1097                c
1098            } else {
1099                '_'
1100            }
1101        })
1102        .take(63)
1103        .collect();
1104    sanitized
1105}
1106
1107/// Current time as Unix epoch seconds (for dedup window checks).
1108fn now_epoch_secs() -> i64 {
1109    std::time::SystemTime::now()
1110        .duration_since(std::time::UNIX_EPOCH)
1111        .unwrap_or_default()
1112        .as_secs() as i64
1113}
1114
1115/// Parse an RFC 3339 timestamp string to Unix epoch seconds.
1116/// Returns `None` if parsing fails.
1117fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
1118    // Use jiff (already a transitive dep via k8s-openapi) for RFC 3339 parsing.
1119    rfc3339
1120        .parse::<jiff::Timestamp>()
1121        .ok()
1122        .map(|t| t.as_second())
1123}
1124
1125/// Build an OwnerReference pointing from a plan to its parent policy.
1126fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
1127    OwnerReference {
1128        api_version: PostgresPolicy::api_version(&()).to_string(),
1129        kind: PostgresPolicy::kind(&()).to_string(),
1130        name: policy.name_any(),
1131        uid: policy.metadata.uid.clone().unwrap_or_default(),
1132        controller: Some(true),
1133        block_owner_deletion: Some(true),
1134    }
1135}
1136
1137/// Update the phase field on a plan's status.
1138///
1139/// When transitioning to `Applying`, also sets `applying_since` for stuck
1140/// plan detection.
1141async fn update_plan_phase(
1142    plans_api: &Api<PostgresPolicyPlan>,
1143    plan_name: &str,
1144    phase: PlanPhase,
1145) -> Result<(), ReconcileError> {
1146    let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
1147    if phase == PlanPhase::Applying {
1148        patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
1149    }
1150    plans_api
1151        .patch_status(
1152            plan_name,
1153            &PatchParams::apply("pgroles-operator"),
1154            &Patch::Merge(&patch_value),
1155        )
1156        .await?;
1157    Ok(())
1158}
1159
1160/// Set or update a condition in a conditions list.
1161///
1162/// Preserves `last_transition_time` when the status value is unchanged
1163/// (only reason/message changed), matching Kubernetes condition conventions.
1164fn set_plan_condition(
1165    conditions: &mut Vec<PolicyCondition>,
1166    condition_type: &str,
1167    status: &str,
1168    reason: &str,
1169    message: &str,
1170) {
1171    let transition_time = if let Some(existing) = conditions
1172        .iter()
1173        .find(|c| c.condition_type == condition_type)
1174    {
1175        if existing.status == status {
1176            existing.last_transition_time.clone()
1177        } else {
1178            Some(crate::crd::now_rfc3339())
1179        }
1180    } else {
1181        Some(crate::crd::now_rfc3339())
1182    };
1183
1184    let condition = PolicyCondition {
1185        condition_type: condition_type.to_string(),
1186        status: status.to_string(),
1187        reason: Some(reason.to_string()),
1188        message: Some(message.to_string()),
1189        last_transition_time: transition_time,
1190    };
1191    if let Some(existing) = conditions
1192        .iter_mut()
1193        .find(|c| c.condition_type == condition_type)
1194    {
1195        *existing = condition;
1196    } else {
1197        conditions.push(condition);
1198    }
1199}
1200
1201/// Update the parent policy's `current_plan_ref` in status.
1202pub async fn update_policy_plan_ref(
1203    client: &Client,
1204    policy: &PostgresPolicy,
1205    plan_name: &str,
1206) -> Result<(), ReconcileError> {
1207    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1208    let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
1209
1210    let patch = serde_json::json!({
1211        "status": {
1212            "current_plan_ref": PlanReference {
1213                name: plan_name.to_string(),
1214            }
1215        }
1216    });
1217
1218    policy_api
1219        .patch_status(
1220            &policy.name_any(),
1221            &PatchParams::apply("pgroles-operator"),
1222            &Patch::Merge(&patch),
1223        )
1224        .await?;
1225
1226    Ok(())
1227}
1228
1229/// Look up the current actionable plan for a policy, if any.
1230///
1231/// An actionable plan is one in `Pending` or `Approved` phase — i.e. a plan
1232/// that the reconciler should evaluate for approval/execution.
1233pub async fn get_current_actionable_plan(
1234    client: &Client,
1235    policy: &PostgresPolicy,
1236) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1237    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1238    let policy_name = policy.name_any();
1239
1240    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1241    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1242    let existing_plans = plans_api
1243        .list(&ListParams::default().labels(&label_selector))
1244        .await?;
1245
1246    // Find the most recent actionable plan (Pending or Approved, by creation time).
1247    let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
1248        .into_iter()
1249        .filter(|plan| {
1250            plan.status
1251                .as_ref()
1252                .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
1253                .unwrap_or(false)
1254        })
1255        .collect();
1256
1257    pending_plans.sort_by(|a, b| {
1258        let a_time = a.metadata.creation_timestamp.as_ref();
1259        let b_time = b.metadata.creation_timestamp.as_ref();
1260        b_time.cmp(&a_time) // newest first
1261    });
1262
1263    Ok(pending_plans.into_iter().next())
1264}
1265
1266/// Look up the most recent plan for a policy in a given phase.
1267pub async fn get_plan_by_phase(
1268    client: &Client,
1269    policy: &PostgresPolicy,
1270    target_phase: PlanPhase,
1271) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1272    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1273    let policy_name = policy.name_any();
1274
1275    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1276    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1277    let existing_plans = plans_api
1278        .list(&ListParams::default().labels(&label_selector))
1279        .await?;
1280
1281    let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
1282        .into_iter()
1283        .filter(|plan| {
1284            plan.status
1285                .as_ref()
1286                .map(|s| s.phase == target_phase)
1287                .unwrap_or(false)
1288        })
1289        .collect();
1290
1291    matching_plans.sort_by(|a, b| {
1292        let a_time = a.metadata.creation_timestamp.as_ref();
1293        let b_time = b.metadata.creation_timestamp.as_ref();
1294        b_time.cmp(&a_time) // newest first
1295    });
1296
1297    Ok(matching_plans.into_iter().next())
1298}
1299
1300/// Mark a plan as Failed with a given error message.
1301pub async fn mark_plan_failed(
1302    client: &Client,
1303    plan: &PostgresPolicyPlan,
1304    error_message: &str,
1305) -> Result<(), ReconcileError> {
1306    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1307    let plan_name = plan.name_any();
1308    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1309
1310    let mut status = plan.status.clone().unwrap_or_default();
1311    status.phase = PlanPhase::Failed;
1312    status.last_error = Some(error_message.to_string());
1313    status.failed_at = Some(crate::crd::now_rfc3339());
1314
1315    let patch = serde_json::json!({ "status": status });
1316    plans_api
1317        .patch_status(
1318            &plan_name,
1319            &PatchParams::apply("pgroles-operator"),
1320            &Patch::Merge(&patch),
1321        )
1322        .await?;
1323
1324    info!(
1325        plan = %plan_name,
1326        "marked stuck Applying plan as Failed"
1327    );
1328
1329    Ok(())
1330}
1331
1332/// Mark a plan as Approved.
1333///
1334/// Callers provide `reason` and `message` to distinguish auto-approval from
1335/// manual approval in the plan's conditions.
1336pub async fn mark_plan_approved(
1337    client: &Client,
1338    plan: &PostgresPolicyPlan,
1339    reason: &str,
1340    message: &str,
1341) -> Result<(), ReconcileError> {
1342    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1343    let plan_name = plan.name_any();
1344    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1345
1346    let mut status = plan.status.clone().unwrap_or_default();
1347    status.phase = PlanPhase::Approved;
1348    set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
1349
1350    let patch = serde_json::json!({ "status": status });
1351    plans_api
1352        .patch_status(
1353            &plan_name,
1354            &PatchParams::apply("pgroles-operator"),
1355            &Patch::Merge(&patch),
1356        )
1357        .await?;
1358
1359    Ok(())
1360}
1361
1362/// Mark a plan as Rejected.
1363pub async fn mark_plan_rejected(
1364    client: &Client,
1365    plan: &PostgresPolicyPlan,
1366) -> Result<(), ReconcileError> {
1367    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1368    let plan_name = plan.name_any();
1369    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1370
1371    let mut status = plan.status.clone().unwrap_or_default();
1372    status.phase = PlanPhase::Rejected;
1373    set_plan_condition(
1374        &mut status.conditions,
1375        "Approved",
1376        "False",
1377        "Rejected",
1378        "Plan rejected via annotation",
1379    );
1380
1381    let patch = serde_json::json!({ "status": status });
1382    plans_api
1383        .patch_status(
1384            &plan_name,
1385            &PatchParams::apply("pgroles-operator"),
1386            &Patch::Merge(&patch),
1387        )
1388        .await?;
1389
1390    Ok(())
1391}
1392
1393/// Mark a plan as Superseded (database state changed since approval).
1394pub async fn mark_plan_superseded(
1395    client: &Client,
1396    plan: &PostgresPolicyPlan,
1397) -> Result<(), ReconcileError> {
1398    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1399    let plan_name = plan.name_any();
1400    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1401
1402    let mut status = plan.status.clone().unwrap_or_default();
1403    status.phase = PlanPhase::Superseded;
1404    set_plan_condition(
1405        &mut status.conditions,
1406        "Approved",
1407        "False",
1408        "Superseded",
1409        "Database state changed since plan was approved",
1410    );
1411
1412    let patch = serde_json::json!({ "status": status });
1413    plans_api
1414        .patch_status(
1415            &plan_name,
1416            &PatchParams::apply("pgroles-operator"),
1417            &Patch::Merge(&patch),
1418        )
1419        .await?;
1420
1421    Ok(())
1422}
1423
1424// ---------------------------------------------------------------------------
1425// Tests
1426// ---------------------------------------------------------------------------
1427
1428#[cfg(test)]
1429mod tests {
1430    use super::*;
1431    use crate::crd::CrdReconciliationMode;
1432    use base64::Engine as _;
1433    use flate2::read::GzDecoder;
1434    use std::io::Read;
1435
1436    fn test_plan(
1437        name: &str,
1438        phase: PlanPhase,
1439        annotations: Option<BTreeMap<String, String>>,
1440    ) -> PostgresPolicyPlan {
1441        let mut plan = PostgresPolicyPlan::new(
1442            name,
1443            PostgresPolicyPlanSpec {
1444                policy_ref: PolicyPlanRef {
1445                    name: "test-policy".to_string(),
1446                },
1447                policy_generation: 1,
1448                reconciliation_mode: CrdReconciliationMode::Authoritative,
1449                owned_roles: vec!["role-a".to_string()],
1450                owned_schemas: vec!["public".to_string()],
1451                managed_database_identity: "default/db/DATABASE_URL".to_string(),
1452            },
1453        );
1454        plan.metadata.namespace = Some("default".to_string());
1455        plan.metadata.annotations = annotations;
1456        plan.status = Some(PostgresPolicyPlanStatus {
1457            phase,
1458            ..Default::default()
1459        });
1460        plan
1461    }
1462
1463    #[test]
1464    fn check_plan_approval_pending_when_no_annotations() {
1465        let plan = test_plan("plan-1", PlanPhase::Pending, None);
1466        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1467    }
1468
1469    #[test]
1470    fn check_plan_approval_approved_with_annotation() {
1471        let annotations =
1472            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1473        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1474        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1475    }
1476
1477    #[test]
1478    fn check_plan_approval_rejected_with_annotation() {
1479        let annotations =
1480            BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1481        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1482        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1483    }
1484
1485    #[test]
1486    fn check_plan_approval_rejected_wins_over_approved() {
1487        let annotations = BTreeMap::from([
1488            (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1489            (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1490        ]);
1491        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1492        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1493    }
1494
1495    #[test]
1496    fn check_plan_approval_non_true_value_is_pending() {
1497        let annotations =
1498            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1499        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1500        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1501    }
1502
1503    #[test]
1504    fn compute_sql_hash_is_deterministic() {
1505        let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1506        let hash1 = compute_sql_hash(sql);
1507        let hash2 = compute_sql_hash(sql);
1508        assert_eq!(hash1, hash2);
1509        assert_eq!(hash1.len(), 64); // SHA-256 hex digest is 64 chars
1510    }
1511
1512    #[test]
1513    fn compute_sql_hash_differs_for_different_sql() {
1514        let hash1 = compute_sql_hash("CREATE ROLE a;");
1515        let hash2 = compute_sql_hash("CREATE ROLE b;");
1516        assert_ne!(hash1, hash2);
1517    }
1518
1519    #[test]
1520    fn compute_sql_hash_matches_pinned_fixture() {
1521        assert_eq!(
1522            compute_sql_hash("CREATE ROLE app LOGIN;"),
1523            "12a9743285d98ce73cfa9c840e943fc627d1fcbce22c5206fda1b21c84c1ac9c"
1524        );
1525    }
1526
1527    #[test]
1528    fn generate_plan_name_has_expected_format() {
1529        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1530        let name = generate_plan_name("my-policy", hash);
1531        assert!(name.starts_with("my-policy-plan-"));
1532        assert!(name.ends_with("-abcdef012345"));
1533        let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1534        // YYYYMMDD-HHMMSS-hashprefix = 15 + 1 + 12 = 28 chars
1535        assert_eq!(suffix.len(), 28);
1536        assert_eq!(&suffix[8..9], "-");
1537        assert_eq!(&suffix[15..16], "-");
1538    }
1539
1540    #[test]
1541    fn generate_plan_name_is_idempotent_for_same_hash_in_same_second() {
1542        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1543        let name1 = generate_plan_name("my-policy", hash);
1544        let name2 = generate_plan_name("my-policy", hash);
1545        assert_eq!(name1, name2);
1546    }
1547
1548    #[test]
1549    fn generate_plan_name_truncates_on_utf8_boundary() {
1550        let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1551        let name = generate_plan_name(&"é".repeat(140), hash);
1552        assert!(name.len() <= 249);
1553        assert!(name.ends_with("-abcdef012345"));
1554    }
1555
1556    #[test]
1557    fn prepare_plan_sql_keeps_small_sql_inline() {
1558        let prepared = prepare_plan_sql("plan-1", "CREATE ROLE app LOGIN;").unwrap();
1559
1560        assert!(matches!(prepared.artifact, PlanSqlArtifact::Inline(_)));
1561        assert_eq!(
1562            prepared.sql_inline(),
1563            Some("CREATE ROLE app LOGIN;".to_string())
1564        );
1565        assert!(prepared.sql_ref().is_none());
1566        assert!(!prepared.is_truncated());
1567    }
1568
1569    #[test]
1570    fn prepare_plan_sql_compresses_large_brownfield_sized_sql() {
1571        let sql = brownfield_sized_sql();
1572        assert!(sql.len() > 1_048_576);
1573
1574        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1575
1576        let PlanSqlArtifact::CompressedConfigMap {
1577            key,
1578            compressed_sql,
1579            ..
1580        } = &prepared.artifact
1581        else {
1582            panic!("expected compressed ConfigMap artifact");
1583        };
1584        assert_eq!(key, SQL_CONFIGMAP_GZIP_KEY);
1585        assert!(compressed_sql.len() < MAX_CONFIGMAP_SQL_BYTES);
1586        assert_eq!(gunzip(compressed_sql), sql);
1587        assert_eq!(
1588            prepared.sql_ref().unwrap().compression,
1589            Some(SqlCompression::Gzip)
1590        );
1591        assert_eq!(prepared.original_bytes, sql.len());
1592        assert_eq!(prepared.stored_bytes, compressed_sql.len());
1593    }
1594
1595    #[test]
1596    fn configmap_binary_data_serializes_with_one_base64_layer() {
1597        let sql = brownfield_sized_sql();
1598        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1599        let PlanSqlArtifact::CompressedConfigMap {
1600            key,
1601            compressed_sql,
1602            ..
1603        } = &prepared.artifact
1604        else {
1605            panic!("expected compressed ConfigMap artifact");
1606        };
1607        let configmap = ConfigMap {
1608            binary_data: Some(BTreeMap::from([(
1609                key.clone(),
1610                ByteString(compressed_sql.clone()),
1611            )])),
1612            ..Default::default()
1613        };
1614
1615        let encoded = serde_json::to_value(&configmap).unwrap()["binaryData"][key]
1616            .as_str()
1617            .unwrap()
1618            .to_string();
1619        let decoded = base64::engine::general_purpose::STANDARD
1620            .decode(encoded)
1621            .unwrap();
1622
1623        assert_eq!(decoded, *compressed_sql);
1624        assert_eq!(gunzip(&decoded), sql);
1625    }
1626
1627    #[test]
1628    fn prepare_plan_sql_truncates_when_compressed_sql_is_still_too_large() {
1629        let sql = deterministic_incompressible_sql(1_400_000);
1630        let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1631
1632        let PlanSqlArtifact::TruncatedInline(preview) = &prepared.artifact else {
1633            panic!("expected truncated inline artifact");
1634        };
1635        assert!(preview.len() <= MAX_INLINE_SQL_BYTES);
1636        assert!(preview.contains("truncated"));
1637        assert!(prepared.sql_ref().is_none());
1638        assert!(prepared.is_truncated());
1639    }
1640
1641    #[test]
1642    fn sanitize_label_value_replaces_slashes() {
1643        let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1644        assert!(!sanitized.contains('/'));
1645        assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1646    }
1647
1648    #[test]
1649    fn sanitize_label_value_truncates_to_63_chars() {
1650        let long_value = "a".repeat(100);
1651        let sanitized = sanitize_label_value(&long_value);
1652        assert!(sanitized.len() <= 63);
1653    }
1654
1655    #[test]
1656    fn stale_policy_sql_configmap_without_plan_label_is_orphan() {
1657        let configmap = ConfigMap {
1658            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1659                labels: Some(BTreeMap::from([(
1660                    LABEL_POLICY.to_string(),
1661                    sanitize_label_value("test-policy"),
1662                )])),
1663                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1664                    jiff::Timestamp::from_second(0).unwrap(),
1665                )),
1666                ..Default::default()
1667            },
1668            ..Default::default()
1669        };
1670
1671        assert!(is_orphan_sql_configmap(
1672            &configmap,
1673            &BTreeSet::new(),
1674            ORPHAN_GRACE_SECS + 1
1675        ));
1676    }
1677
1678    #[test]
1679    fn stale_policy_sql_configmap_with_known_plan_label_is_not_orphan() {
1680        let plan_label = sanitize_label_value("test-policy-plan-20260506-000000-abcdef012345");
1681        let configmap = ConfigMap {
1682            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1683                labels: Some(BTreeMap::from([
1684                    (
1685                        LABEL_POLICY.to_string(),
1686                        sanitize_label_value("test-policy"),
1687                    ),
1688                    (LABEL_PLAN.to_string(), plan_label.clone()),
1689                ])),
1690                creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1691                    jiff::Timestamp::from_second(0).unwrap(),
1692                )),
1693                ..Default::default()
1694            },
1695            ..Default::default()
1696        };
1697
1698        assert!(!is_orphan_sql_configmap(
1699            &configmap,
1700            &BTreeSet::from([plan_label]),
1701            ORPHAN_GRACE_SECS + 1
1702        ));
1703    }
1704
1705    #[test]
1706    fn render_redacted_sql_masks_passwords() {
1707        let changes = vec![
1708            pgroles_core::diff::Change::CreateRole {
1709                name: "app".to_string(),
1710                state: pgroles_core::model::RoleState {
1711                    login: true,
1712                    ..pgroles_core::model::RoleState::default()
1713                },
1714            },
1715            pgroles_core::diff::Change::SetPassword {
1716                name: "app".to_string(),
1717                password: "super_secret".to_string(),
1718            },
1719        ];
1720        let ctx = pgroles_core::sql::SqlContext::default();
1721        let redacted = render_redacted_sql(&changes, &ctx);
1722
1723        assert!(redacted.contains("[REDACTED]"));
1724        assert!(!redacted.contains("super_secret"));
1725        assert!(redacted.contains("CREATE ROLE"));
1726    }
1727
1728    #[test]
1729    fn render_full_sql_includes_passwords() {
1730        let changes = vec![pgroles_core::diff::Change::SetPassword {
1731            name: "app".to_string(),
1732            password: "super_secret".to_string(),
1733        }];
1734        let ctx = pgroles_core::sql::SqlContext::default();
1735        let full = render_full_sql(&changes, &ctx);
1736
1737        assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1738    }
1739
1740    #[test]
1741    fn now_epoch_secs_returns_plausible_value() {
1742        let now = now_epoch_secs();
1743        // Should be after 2025-01-01 and before 2100-01-01.
1744        let y2025 = 1_735_689_600_i64;
1745        let y2100 = 4_102_444_800_i64;
1746        assert!(
1747            now > y2025 && now < y2100,
1748            "epoch secs {now} should be between 2025 and 2100"
1749        );
1750    }
1751
1752    fn brownfield_sized_sql() -> String {
1753        let mut sql = String::new();
1754        for schema in 0..33 {
1755            for profile in ["reader", "writer", "owner", "cdc"] {
1756                let role = format!("schema_{schema}_{profile}");
1757                sql.push_str(&format!(
1758                    "CREATE ROLE \"{role}\" LOGIN;\nCOMMENT ON ROLE \"{role}\" IS 'Generated from profile {profile} for brownfield migration schema {schema} with cdc ownership directives and review metadata';\n"
1759                ));
1760                for relkind in ["TABLES", "SEQUENCES", "FUNCTIONS"] {
1761                    sql.push_str(&format!(
1762                        "GRANT SELECT ON ALL {relkind} IN SCHEMA \"schema_{schema}\" TO \"{role}\";\n"
1763                    ));
1764                }
1765                for owner in 0..20 {
1766                    sql.push_str(&format!(
1767                        "ALTER DEFAULT PRIVILEGES FOR ROLE \"owner_{owner}\" IN SCHEMA \"schema_{schema}\" GRANT SELECT ON TABLES TO \"{role}\";\n"
1768                    ));
1769                }
1770            }
1771        }
1772        for member in 0..70 {
1773            sql.push_str(&format!(
1774                "GRANT \"group_{member}\" TO \"service_login_{}\";\n",
1775                member % 20
1776            ));
1777        }
1778        while sql.len() <= 1_100_000 {
1779            sql.push_str("-- brownfield migration padding for large plan regression\n");
1780        }
1781        sql
1782    }
1783
1784    fn deterministic_incompressible_sql(target_bytes: usize) -> String {
1785        let mut state = 0x1234_5678_u64;
1786        let mut sql = String::with_capacity(target_bytes);
1787        while sql.len() < target_bytes {
1788            state ^= state << 13;
1789            state ^= state >> 7;
1790            state ^= state << 17;
1791            let value = (state % 62) as u8;
1792            let ch = match value {
1793                0..=9 => b'0' + value,
1794                10..=35 => b'a' + (value - 10),
1795                _ => b'A' + (value - 36),
1796            };
1797            sql.push(ch as char);
1798            if sql.len().is_multiple_of(120) {
1799                sql.push('\n');
1800            }
1801        }
1802        sql
1803    }
1804
1805    fn gunzip(bytes: &[u8]) -> String {
1806        let mut decoder = GzDecoder::new(bytes);
1807        let mut decoded = String::new();
1808        decoder.read_to_string(&mut decoded).unwrap();
1809        decoded
1810    }
1811}