Skip to main content

pgroles_operator/
plan.rs

1//! Plan lifecycle management for `PostgresPolicyPlan` resources.
2//!
3//! Handles creating, deduplicating, approving, executing, and cleaning up
4//! reconciliation plans. Plans represent computed SQL change sets that may
5//! require explicit approval before execution against a database.
6
7use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17    ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18    PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19    PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20    PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24/// Result of plan creation — distinguishes genuinely new plans from
25/// deduplication hits so callers can decide whether to emit events.
26#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28    /// A new plan was created with the given name.
29    Created(String),
30    /// An existing plan with the same hash was found (deduplication).
31    Deduplicated(String),
32}
33
34impl PlanCreationResult {
35    /// Return the plan name regardless of variant.
36    pub fn plan_name(&self) -> &str {
37        match self {
38            PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39        }
40    }
41
42    /// True when a new plan was actually created.
43    pub fn is_created(&self) -> bool {
44        matches!(self, PlanCreationResult::Created(_))
45    }
46}
47
48/// Maximum inline SQL size in plan status before spilling to a ConfigMap.
49const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51/// ConfigMap data key for the SQL content.
52const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54/// Default maximum number of historical plans to retain per policy.
55const DEFAULT_MAX_PLANS: usize = 10;
56
57/// How recently a Failed plan must have been created (in seconds) for the
58/// dedup check to consider it a match. Plans older than this are ignored so
59/// that retries after the user fixes the environment are not blocked.
60const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62// ---------------------------------------------------------------------------
63// Plan approval check
64// ---------------------------------------------------------------------------
65
66/// Result of checking a plan's approval annotations.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69    Pending,
70    Approved,
71    Rejected,
72}
73
74/// Check the approval state of a plan by inspecting its annotations.
75///
76/// Rejection takes priority over approval: if both annotations are set,
77/// the plan is considered rejected.
78pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79    let annotations = plan.metadata.annotations.as_ref();
80
81    let rejected = annotations
82        .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83        .map(|v| v == "true")
84        .unwrap_or(false);
85
86    if rejected {
87        return PlanApprovalState::Rejected;
88    }
89
90    let approved = annotations
91        .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92        .map(|v| v == "true")
93        .unwrap_or(false);
94
95    if approved {
96        return PlanApprovalState::Approved;
97    }
98
99    PlanApprovalState::Pending
100}
101
102// ---------------------------------------------------------------------------
103// Plan creation
104// ---------------------------------------------------------------------------
105
106/// Create or deduplicate a `PostgresPolicyPlan` for the given policy and changes.
107///
108/// Returns the name of the plan resource (either existing or newly created).
109///
110/// This function:
111/// 1. Renders the full executable SQL from the changes
112/// 2. Computes SHA-256 of the full SQL (before any redaction/truncation)
113/// 3. Checks for an existing Pending plan with the same hash (dedup)
114/// 4. Marks any existing Pending plan with a different hash as Superseded
115/// 5. Creates the new plan resource with ownerReferences
116/// 6. Creates a ConfigMap for large SQL, or stores inline
117/// 7. Updates the plan status
118#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120    client: &Client,
121    policy: &PostgresPolicy,
122    changes: &[pgroles_core::diff::Change],
123    sql_context: &pgroles_core::sql::SqlContext,
124    inspect_config: &pgroles_inspect::InspectConfig,
125    reconciliation_mode: CrdReconciliationMode,
126    database_identity: &str,
127    change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130    let policy_name = policy.name_any();
131    let generation = policy.metadata.generation.unwrap_or(0);
132
133    // 1. Render the full executable SQL (not redacted).
134    let full_sql = render_full_sql(changes, sql_context);
135
136    // 2. Compute SHA-256 hash of the full SQL.
137    let sql_hash = compute_sql_hash(&full_sql);
138
139    // 3. Render redacted SQL for display (passwords masked).
140    let redacted_sql = render_redacted_sql(changes, sql_context);
141
142    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
143
144    // 4. List existing plans for this policy.
145    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
146    let existing_plans = plans_api
147        .list(&ListParams::default().labels(&label_selector))
148        .await?;
149
150    // 5. Check for duplicate pending plan with same hash.
151    for plan in &existing_plans {
152        if let Some(ref status) = plan.status
153            && status.phase == PlanPhase::Pending
154            && status.sql_hash.as_deref() == Some(&sql_hash)
155        {
156            // Identical plan already exists — return early (deduplicated).
157            let plan_name = plan.name_any();
158            info!(
159                plan = %plan_name,
160                policy = %policy_name,
161                "existing pending plan has identical SQL hash, skipping creation"
162            );
163            return Ok(PlanCreationResult::Deduplicated(plan_name));
164        }
165    }
166
167    // 5b. Check for recently-failed plan with the same hash. If a plan with
168    // this exact SQL already failed within the dedup window, creating another
169    // identical one is pointless — it would produce the same error. The window
170    // ensures we don't block retries after the user fixes the environment.
171    //
172    // Uses `status.failed_at` (not `creation_timestamp`) so that plans which
173    // waited for approval before failing are measured from the failure time.
174    let now_ts = now_epoch_secs();
175    for plan in &existing_plans {
176        if let Some(ref status) = plan.status
177            && status.phase == PlanPhase::Failed
178            && status.sql_hash.as_deref() == Some(&sql_hash)
179        {
180            let failed_ts = status
181                .failed_at
182                .as_deref()
183                .and_then(parse_rfc3339_epoch_secs)
184                .unwrap_or(0);
185            if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
186                let plan_name = plan.name_any();
187                info!(
188                    plan = %plan_name,
189                    policy = %policy_name,
190                    age_secs = now_ts - failed_ts,
191                    "recently-failed plan has identical SQL hash, skipping creation"
192                );
193                return Ok(PlanCreationResult::Deduplicated(plan_name));
194            }
195        }
196    }
197
198    // 6. Mark any existing Pending plans as Superseded.
199    for plan in &existing_plans {
200        if let Some(ref status) = plan.status
201            && status.phase == PlanPhase::Pending
202        {
203            let plan_name = plan.name_any();
204            info!(
205                plan = %plan_name,
206                policy = %policy_name,
207                "marking existing pending plan as Superseded"
208            );
209            let superseded_status = PostgresPolicyPlanStatus {
210                phase: PlanPhase::Superseded,
211                ..status.clone()
212            };
213            let patch = serde_json::json!({ "status": superseded_status });
214            plans_api
215                .patch_status(
216                    &plan_name,
217                    &PatchParams::apply("pgroles-operator"),
218                    &Patch::Merge(&patch),
219                )
220                .await?;
221        }
222    }
223
224    // 7. Generate a unique plan name using a timestamp.
225    let plan_name = generate_plan_name(&policy_name);
226
227    // 8. Build ownerReference pointing to the parent policy.
228    let owner_ref = build_owner_reference(policy);
229
230    // 9. Create the plan resource.
231    let plan = PostgresPolicyPlan::new(
232        &plan_name,
233        PostgresPolicyPlanSpec {
234            policy_ref: PolicyPlanRef {
235                name: policy_name.clone(),
236            },
237            policy_generation: generation,
238            reconciliation_mode,
239            owned_roles: inspect_config.managed_roles.clone(),
240            owned_schemas: inspect_config.managed_schemas.clone(),
241            managed_database_identity: database_identity.to_string(),
242        },
243    );
244    let mut plan = plan;
245    plan.metadata.namespace = Some(namespace.clone());
246    plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
247    plan.metadata.labels = Some(BTreeMap::from([
248        (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
249        (
250            LABEL_DATABASE_IDENTITY.to_string(),
251            sanitize_label_value(database_identity),
252        ),
253    ]));
254
255    let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
256    let plan_name = created_plan.name_any();
257
258    // 10. Handle SQL storage: inline or ConfigMap.
259    let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
260        (Some(redacted_sql), None)
261    } else {
262        // Create a ConfigMap for the full redacted SQL.
263        let configmap_name = format!("{plan_name}-sql");
264        let configmap = ConfigMap {
265            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
266                name: Some(configmap_name.clone()),
267                namespace: Some(namespace.clone()),
268                owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
269                labels: Some(BTreeMap::from([(
270                    LABEL_POLICY.to_string(),
271                    sanitize_label_value(&policy_name),
272                )])),
273                ..Default::default()
274            },
275            data: Some(BTreeMap::from([(
276                SQL_CONFIGMAP_KEY.to_string(),
277                redacted_sql,
278            )])),
279            ..Default::default()
280        };
281
282        let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
283        configmaps_api
284            .create(&PostParams::default(), &configmap)
285            .await?;
286
287        (
288            None,
289            Some(SqlRef {
290                name: configmap_name,
291                key: SQL_CONFIGMAP_KEY.to_string(),
292            }),
293        )
294    };
295
296    // 11. Update plan status.
297    let plan_status = PostgresPolicyPlanStatus {
298        phase: PlanPhase::Pending,
299        conditions: vec![
300            PolicyCondition {
301                condition_type: "Computed".to_string(),
302                status: "True".to_string(),
303                reason: Some("PlanComputed".to_string()),
304                message: Some(format!(
305                    "Plan computed with {} change(s)",
306                    change_summary.total
307                )),
308                last_transition_time: Some(crate::crd::now_rfc3339()),
309            },
310            PolicyCondition {
311                condition_type: "Approved".to_string(),
312                status: "False".to_string(),
313                reason: Some("PendingApproval".to_string()),
314                message: Some("Plan awaiting approval".to_string()),
315                last_transition_time: Some(crate::crd::now_rfc3339()),
316            },
317        ],
318        change_summary: Some(change_summary.clone()),
319        sql_ref,
320        sql_inline,
321        computed_at: Some(crate::crd::now_rfc3339()),
322        applied_at: None,
323        last_error: None,
324        sql_hash: Some(sql_hash),
325        applying_since: None,
326        failed_at: None,
327    };
328
329    let status_patch = serde_json::json!({ "status": plan_status });
330    plans_api
331        .patch_status(
332            &plan_name,
333            &PatchParams::apply("pgroles-operator"),
334            &Patch::Merge(&status_patch),
335        )
336        .await?;
337
338    info!(
339        plan = %plan_name,
340        policy = %policy_name,
341        changes = change_summary.total,
342        "created new plan"
343    );
344
345    Ok(PlanCreationResult::Created(plan_name))
346}
347
348// ---------------------------------------------------------------------------
349// Plan execution
350// ---------------------------------------------------------------------------
351
352/// Execute an approved plan against the database.
353///
354/// Reads SQL from inline status or the referenced ConfigMap, executes it in
355/// a transaction, and updates the plan status to Applied or Failed.
356pub async fn execute_plan(
357    client: &Client,
358    plan: &PostgresPolicyPlan,
359    pool: &sqlx::PgPool,
360    sql_context: &pgroles_core::sql::SqlContext,
361    changes: &[pgroles_core::diff::Change],
362) -> Result<(), ReconcileError> {
363    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
364    let plan_name = plan.name_any();
365    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
366
367    // Update phase to Applying.
368    update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
369
370    // Execute the SQL in a transaction using the original changes (not stored SQL).
371    // This ensures we use the actual executable SQL including real passwords,
372    // not the redacted version stored in the plan.
373    let result = execute_changes_in_transaction(pool, changes, sql_context).await;
374
375    match result {
376        Ok(statements_executed) => {
377            // Update plan status to Applied.
378            let mut applied_status = plan.status.clone().unwrap_or_default();
379            applied_status.phase = PlanPhase::Applied;
380            applied_status.applied_at = Some(crate::crd::now_rfc3339());
381            applied_status.last_error = None;
382            set_plan_condition(
383                &mut applied_status.conditions,
384                "Approved",
385                "True",
386                "Approved",
387                "Plan approved and executed",
388            );
389
390            let patch = serde_json::json!({ "status": applied_status });
391            plans_api
392                .patch_status(
393                    &plan_name,
394                    &PatchParams::apply("pgroles-operator"),
395                    &Patch::Merge(&patch),
396                )
397                .await?;
398
399            info!(
400                plan = %plan_name,
401                statements = statements_executed,
402                "plan executed successfully"
403            );
404            Ok(())
405        }
406        Err(err) => {
407            // Update plan status to Failed.
408            let error_message = err.to_string();
409            let mut failed_status = plan.status.clone().unwrap_or_default();
410            failed_status.phase = PlanPhase::Failed;
411            failed_status.last_error = Some(error_message);
412            failed_status.failed_at = Some(crate::crd::now_rfc3339());
413
414            let patch = serde_json::json!({ "status": failed_status });
415            if let Err(status_err) = plans_api
416                .patch_status(
417                    &plan_name,
418                    &PatchParams::apply("pgroles-operator"),
419                    &Patch::Merge(&patch),
420                )
421                .await
422            {
423                tracing::warn!(
424                    plan = %plan_name,
425                    %status_err,
426                    "failed to update plan status to Failed"
427                );
428            }
429
430            Err(err)
431        }
432    }
433}
434
435/// Execute SQL changes in a database transaction.
436///
437/// Returns the number of statements executed on success.
438async fn execute_changes_in_transaction(
439    pool: &sqlx::PgPool,
440    changes: &[pgroles_core::diff::Change],
441    sql_context: &pgroles_core::sql::SqlContext,
442) -> Result<usize, ReconcileError> {
443    let mut transaction = pool.begin().await?;
444    let mut statements_executed = 0usize;
445
446    for change in changes {
447        let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
448        for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
449            if is_sensitive {
450                tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
451            } else {
452                tracing::debug!(%sql, "executing");
453            }
454            sqlx::query(&sql).execute(transaction.as_mut()).await?;
455            statements_executed += 1;
456        }
457    }
458
459    transaction.commit().await?;
460    Ok(statements_executed)
461}
462
463// ---------------------------------------------------------------------------
464// Plan cleanup / retention
465// ---------------------------------------------------------------------------
466
467/// Clean up old plans for a policy, retaining at most `max_plans` terminal plans.
468///
469/// Terminal plans are those in Applied, Failed, Superseded, or Rejected phase.
470/// Pending and Approved plans are never cleaned up by this function.
471pub async fn cleanup_old_plans(
472    client: &Client,
473    policy: &PostgresPolicy,
474    max_plans: Option<usize>,
475) -> Result<(), ReconcileError> {
476    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
477    let policy_name = policy.name_any();
478    let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
479
480    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
481    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
482    let existing_plans = plans_api
483        .list(&ListParams::default().labels(&label_selector))
484        .await?;
485
486    // Collect terminal plans sorted by creation timestamp (oldest first).
487    let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
488        .iter()
489        .filter(|plan| {
490            plan.status
491                .as_ref()
492                .map(|s| {
493                    matches!(
494                        s.phase,
495                        PlanPhase::Applied
496                            | PlanPhase::Failed
497                            | PlanPhase::Superseded
498                            | PlanPhase::Rejected
499                    )
500                })
501                .unwrap_or(false)
502        })
503        .collect();
504
505    if terminal_plans.len() <= max_plans {
506        return Ok(());
507    }
508
509    // Sort by creation timestamp ascending (oldest first).
510    terminal_plans.sort_by(|a, b| {
511        let a_time = a.metadata.creation_timestamp.as_ref();
512        let b_time = b.metadata.creation_timestamp.as_ref();
513        a_time.cmp(&b_time)
514    });
515
516    let plans_to_delete = terminal_plans.len() - max_plans;
517    for plan in terminal_plans.into_iter().take(plans_to_delete) {
518        let plan_name = plan.name_any();
519        info!(
520            plan = %plan_name,
521            policy = %policy_name,
522            "cleaning up old plan"
523        );
524        if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
525            tracing::warn!(
526                plan = %plan_name,
527                %err,
528                "failed to delete old plan during cleanup"
529            );
530        }
531    }
532
533    Ok(())
534}
535
536// ---------------------------------------------------------------------------
537// Helpers
538// ---------------------------------------------------------------------------
539
540/// Render the full executable SQL from changes (including real passwords).
541pub(crate) fn render_full_sql(
542    changes: &[pgroles_core::diff::Change],
543    sql_context: &pgroles_core::sql::SqlContext,
544) -> String {
545    changes
546        .iter()
547        .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
548        .collect::<Vec<_>>()
549        .join("\n")
550}
551
552/// Render redacted SQL for display (passwords replaced with [REDACTED]).
553fn render_redacted_sql(
554    changes: &[pgroles_core::diff::Change],
555    sql_context: &pgroles_core::sql::SqlContext,
556) -> String {
557    changes
558        .iter()
559        .flat_map(|change| {
560            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
561                vec![format!(
562                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
563                    pgroles_core::sql::quote_ident(name)
564                )]
565            } else {
566                pgroles_core::sql::render_statements_with_context(change, sql_context)
567            }
568        })
569        .collect::<Vec<_>>()
570        .join("\n")
571}
572
573/// Compute SHA-256 hash of the SQL string as a hex digest.
574pub(crate) fn compute_sql_hash(sql: &str) -> String {
575    let mut hasher = Sha256::new();
576    hasher.update(sql.as_bytes());
577    format!("{:x}", hasher.finalize())
578}
579
580/// Generate a plan name from policy name and current timestamp.
581///
582/// Format: `{policy-name}-plan-{YYYYMMDD-HHMMSS}-{millis}{random}`
583///
584/// A millisecond and random suffix is appended to avoid collisions when the
585/// operator retries within the same second.
586fn generate_plan_name(policy_name: &str) -> String {
587    let now = std::time::SystemTime::now()
588        .duration_since(std::time::UNIX_EPOCH)
589        .unwrap_or_default();
590    let timestamp = format_timestamp_compact();
591    let millis = now.subsec_millis();
592    let random_suffix: u32 = rand::random::<u32>() % 1000;
593    let suffix = format!("{millis:03}{random_suffix:03}");
594    // Kubernetes names must be <= 253 chars and DNS-compatible.
595    // Reserve 4 chars for the potential "-sql" ConfigMap suffix.
596    let max_name_len = 253 - 4; // 249
597    let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
598    let prefix = if policy_name.len() > max_prefix_len {
599        &policy_name[..max_prefix_len]
600    } else {
601        policy_name
602    };
603    format!("{prefix}-plan-{timestamp}-{suffix}")
604}
605
606/// Format the current UTC time as `YYYYMMDD-HHMMSS`.
607fn format_timestamp_compact() -> String {
608    use std::time::SystemTime;
609    let now = SystemTime::now()
610        .duration_since(SystemTime::UNIX_EPOCH)
611        .unwrap_or_default();
612    let secs = now.as_secs();
613    let (year, month, day) = crate::crd::days_to_date(secs / 86400);
614    let remaining = secs % 86400;
615    let hours = remaining / 3600;
616    let minutes = (remaining % 3600) / 60;
617    let seconds = remaining % 60;
618    format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
619}
620
621/// Sanitize a string for use as a Kubernetes label value.
622///
623/// Label values must be <= 63 chars and match `[a-z0-9A-Z._-]*`.
624fn sanitize_label_value(value: &str) -> String {
625    let sanitized: String = value
626        .chars()
627        .map(|c| {
628            if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
629                c
630            } else {
631                '_'
632            }
633        })
634        .take(63)
635        .collect();
636    sanitized
637}
638
639/// Current time as Unix epoch seconds (for dedup window checks).
640fn now_epoch_secs() -> i64 {
641    std::time::SystemTime::now()
642        .duration_since(std::time::UNIX_EPOCH)
643        .unwrap_or_default()
644        .as_secs() as i64
645}
646
647/// Parse an RFC 3339 timestamp string to Unix epoch seconds.
648/// Returns `None` if parsing fails.
649fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
650    // Use jiff (already a transitive dep via k8s-openapi) for RFC 3339 parsing.
651    rfc3339
652        .parse::<jiff::Timestamp>()
653        .ok()
654        .map(|t| t.as_second())
655}
656
657/// Build an OwnerReference pointing from a plan to its parent policy.
658fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
659    OwnerReference {
660        api_version: PostgresPolicy::api_version(&()).to_string(),
661        kind: PostgresPolicy::kind(&()).to_string(),
662        name: policy.name_any(),
663        uid: policy.metadata.uid.clone().unwrap_or_default(),
664        controller: Some(true),
665        block_owner_deletion: Some(true),
666    }
667}
668
669/// Build an OwnerReference pointing from a ConfigMap to its parent plan.
670fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
671    OwnerReference {
672        api_version: PostgresPolicyPlan::api_version(&()).to_string(),
673        kind: PostgresPolicyPlan::kind(&()).to_string(),
674        name: plan.name_any(),
675        uid: plan.metadata.uid.clone().unwrap_or_default(),
676        controller: Some(true),
677        block_owner_deletion: Some(true),
678    }
679}
680
681/// Update the phase field on a plan's status.
682///
683/// When transitioning to `Applying`, also sets `applying_since` for stuck
684/// plan detection.
685async fn update_plan_phase(
686    plans_api: &Api<PostgresPolicyPlan>,
687    plan_name: &str,
688    phase: PlanPhase,
689) -> Result<(), ReconcileError> {
690    let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
691    if phase == PlanPhase::Applying {
692        patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
693    }
694    plans_api
695        .patch_status(
696            plan_name,
697            &PatchParams::apply("pgroles-operator"),
698            &Patch::Merge(&patch_value),
699        )
700        .await?;
701    Ok(())
702}
703
704/// Set or update a condition in a conditions list.
705///
706/// Preserves `last_transition_time` when the status value is unchanged
707/// (only reason/message changed), matching Kubernetes condition conventions.
708fn set_plan_condition(
709    conditions: &mut Vec<PolicyCondition>,
710    condition_type: &str,
711    status: &str,
712    reason: &str,
713    message: &str,
714) {
715    let transition_time = if let Some(existing) = conditions
716        .iter()
717        .find(|c| c.condition_type == condition_type)
718    {
719        if existing.status == status {
720            existing.last_transition_time.clone()
721        } else {
722            Some(crate::crd::now_rfc3339())
723        }
724    } else {
725        Some(crate::crd::now_rfc3339())
726    };
727
728    let condition = PolicyCondition {
729        condition_type: condition_type.to_string(),
730        status: status.to_string(),
731        reason: Some(reason.to_string()),
732        message: Some(message.to_string()),
733        last_transition_time: transition_time,
734    };
735    if let Some(existing) = conditions
736        .iter_mut()
737        .find(|c| c.condition_type == condition_type)
738    {
739        *existing = condition;
740    } else {
741        conditions.push(condition);
742    }
743}
744
745/// Update the parent policy's `current_plan_ref` in status.
746pub async fn update_policy_plan_ref(
747    client: &Client,
748    policy: &PostgresPolicy,
749    plan_name: &str,
750) -> Result<(), ReconcileError> {
751    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
752    let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
753
754    let patch = serde_json::json!({
755        "status": {
756            "current_plan_ref": PlanReference {
757                name: plan_name.to_string(),
758            }
759        }
760    });
761
762    policy_api
763        .patch_status(
764            &policy.name_any(),
765            &PatchParams::apply("pgroles-operator"),
766            &Patch::Merge(&patch),
767        )
768        .await?;
769
770    Ok(())
771}
772
773/// Look up the current actionable plan for a policy, if any.
774///
775/// An actionable plan is one in `Pending` or `Approved` phase — i.e. a plan
776/// that the reconciler should evaluate for approval/execution.
777pub async fn get_current_actionable_plan(
778    client: &Client,
779    policy: &PostgresPolicy,
780) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
781    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
782    let policy_name = policy.name_any();
783
784    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
785    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
786    let existing_plans = plans_api
787        .list(&ListParams::default().labels(&label_selector))
788        .await?;
789
790    // Find the most recent actionable plan (Pending or Approved, by creation time).
791    let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
792        .into_iter()
793        .filter(|plan| {
794            plan.status
795                .as_ref()
796                .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
797                .unwrap_or(false)
798        })
799        .collect();
800
801    pending_plans.sort_by(|a, b| {
802        let a_time = a.metadata.creation_timestamp.as_ref();
803        let b_time = b.metadata.creation_timestamp.as_ref();
804        b_time.cmp(&a_time) // newest first
805    });
806
807    Ok(pending_plans.into_iter().next())
808}
809
810/// Look up the most recent plan for a policy in a given phase.
811pub async fn get_plan_by_phase(
812    client: &Client,
813    policy: &PostgresPolicy,
814    target_phase: PlanPhase,
815) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
816    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
817    let policy_name = policy.name_any();
818
819    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
820    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
821    let existing_plans = plans_api
822        .list(&ListParams::default().labels(&label_selector))
823        .await?;
824
825    let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
826        .into_iter()
827        .filter(|plan| {
828            plan.status
829                .as_ref()
830                .map(|s| s.phase == target_phase)
831                .unwrap_or(false)
832        })
833        .collect();
834
835    matching_plans.sort_by(|a, b| {
836        let a_time = a.metadata.creation_timestamp.as_ref();
837        let b_time = b.metadata.creation_timestamp.as_ref();
838        b_time.cmp(&a_time) // newest first
839    });
840
841    Ok(matching_plans.into_iter().next())
842}
843
844/// Mark a plan as Failed with a given error message.
845pub async fn mark_plan_failed(
846    client: &Client,
847    plan: &PostgresPolicyPlan,
848    error_message: &str,
849) -> Result<(), ReconcileError> {
850    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
851    let plan_name = plan.name_any();
852    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
853
854    let mut status = plan.status.clone().unwrap_or_default();
855    status.phase = PlanPhase::Failed;
856    status.last_error = Some(error_message.to_string());
857    status.failed_at = Some(crate::crd::now_rfc3339());
858
859    let patch = serde_json::json!({ "status": status });
860    plans_api
861        .patch_status(
862            &plan_name,
863            &PatchParams::apply("pgroles-operator"),
864            &Patch::Merge(&patch),
865        )
866        .await?;
867
868    info!(
869        plan = %plan_name,
870        "marked stuck Applying plan as Failed"
871    );
872
873    Ok(())
874}
875
876/// Mark a plan as Approved.
877///
878/// Callers provide `reason` and `message` to distinguish auto-approval from
879/// manual approval in the plan's conditions.
880pub async fn mark_plan_approved(
881    client: &Client,
882    plan: &PostgresPolicyPlan,
883    reason: &str,
884    message: &str,
885) -> Result<(), ReconcileError> {
886    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
887    let plan_name = plan.name_any();
888    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
889
890    let mut status = plan.status.clone().unwrap_or_default();
891    status.phase = PlanPhase::Approved;
892    set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
893
894    let patch = serde_json::json!({ "status": status });
895    plans_api
896        .patch_status(
897            &plan_name,
898            &PatchParams::apply("pgroles-operator"),
899            &Patch::Merge(&patch),
900        )
901        .await?;
902
903    Ok(())
904}
905
906/// Mark a plan as Rejected.
907pub async fn mark_plan_rejected(
908    client: &Client,
909    plan: &PostgresPolicyPlan,
910) -> Result<(), ReconcileError> {
911    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
912    let plan_name = plan.name_any();
913    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
914
915    let mut status = plan.status.clone().unwrap_or_default();
916    status.phase = PlanPhase::Rejected;
917    set_plan_condition(
918        &mut status.conditions,
919        "Approved",
920        "False",
921        "Rejected",
922        "Plan rejected via annotation",
923    );
924
925    let patch = serde_json::json!({ "status": status });
926    plans_api
927        .patch_status(
928            &plan_name,
929            &PatchParams::apply("pgroles-operator"),
930            &Patch::Merge(&patch),
931        )
932        .await?;
933
934    Ok(())
935}
936
937/// Mark a plan as Superseded (database state changed since approval).
938pub async fn mark_plan_superseded(
939    client: &Client,
940    plan: &PostgresPolicyPlan,
941) -> Result<(), ReconcileError> {
942    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
943    let plan_name = plan.name_any();
944    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
945
946    let mut status = plan.status.clone().unwrap_or_default();
947    status.phase = PlanPhase::Superseded;
948    set_plan_condition(
949        &mut status.conditions,
950        "Approved",
951        "False",
952        "Superseded",
953        "Database state changed since plan was approved",
954    );
955
956    let patch = serde_json::json!({ "status": status });
957    plans_api
958        .patch_status(
959            &plan_name,
960            &PatchParams::apply("pgroles-operator"),
961            &Patch::Merge(&patch),
962        )
963        .await?;
964
965    Ok(())
966}
967
968// ---------------------------------------------------------------------------
969// Tests
970// ---------------------------------------------------------------------------
971
972#[cfg(test)]
973mod tests {
974    use super::*;
975    use crate::crd::CrdReconciliationMode;
976
977    fn test_plan(
978        name: &str,
979        phase: PlanPhase,
980        annotations: Option<BTreeMap<String, String>>,
981    ) -> PostgresPolicyPlan {
982        let mut plan = PostgresPolicyPlan::new(
983            name,
984            PostgresPolicyPlanSpec {
985                policy_ref: PolicyPlanRef {
986                    name: "test-policy".to_string(),
987                },
988                policy_generation: 1,
989                reconciliation_mode: CrdReconciliationMode::Authoritative,
990                owned_roles: vec!["role-a".to_string()],
991                owned_schemas: vec!["public".to_string()],
992                managed_database_identity: "default/db/DATABASE_URL".to_string(),
993            },
994        );
995        plan.metadata.namespace = Some("default".to_string());
996        plan.metadata.annotations = annotations;
997        plan.status = Some(PostgresPolicyPlanStatus {
998            phase,
999            ..Default::default()
1000        });
1001        plan
1002    }
1003
1004    #[test]
1005    fn check_plan_approval_pending_when_no_annotations() {
1006        let plan = test_plan("plan-1", PlanPhase::Pending, None);
1007        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1008    }
1009
1010    #[test]
1011    fn check_plan_approval_approved_with_annotation() {
1012        let annotations =
1013            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1014        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1015        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1016    }
1017
1018    #[test]
1019    fn check_plan_approval_rejected_with_annotation() {
1020        let annotations =
1021            BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1022        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1023        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1024    }
1025
1026    #[test]
1027    fn check_plan_approval_rejected_wins_over_approved() {
1028        let annotations = BTreeMap::from([
1029            (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1030            (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1031        ]);
1032        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1033        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1034    }
1035
1036    #[test]
1037    fn check_plan_approval_non_true_value_is_pending() {
1038        let annotations =
1039            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1040        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1041        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1042    }
1043
1044    #[test]
1045    fn compute_sql_hash_is_deterministic() {
1046        let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1047        let hash1 = compute_sql_hash(sql);
1048        let hash2 = compute_sql_hash(sql);
1049        assert_eq!(hash1, hash2);
1050        assert_eq!(hash1.len(), 64); // SHA-256 hex digest is 64 chars
1051    }
1052
1053    #[test]
1054    fn compute_sql_hash_differs_for_different_sql() {
1055        let hash1 = compute_sql_hash("CREATE ROLE a;");
1056        let hash2 = compute_sql_hash("CREATE ROLE b;");
1057        assert_ne!(hash1, hash2);
1058    }
1059
1060    #[test]
1061    fn generate_plan_name_has_expected_format() {
1062        let name = generate_plan_name("my-policy");
1063        assert!(name.starts_with("my-policy-plan-"));
1064        // Should be "my-policy-plan-YYYYMMDD-HHMMSS-MMMRRR"
1065        let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1066        // YYYYMMDD-HHMMSS-MMMRRR = 15 + 1 + 6 = 22 chars
1067        assert_eq!(suffix.len(), 22);
1068        assert_eq!(&suffix[8..9], "-");
1069        assert_eq!(&suffix[15..16], "-");
1070    }
1071
1072    #[test]
1073    fn generate_plan_name_is_unique_across_calls() {
1074        let name1 = generate_plan_name("my-policy");
1075        let name2 = generate_plan_name("my-policy");
1076        // With millisecond + random suffix, collisions are extremely unlikely
1077        // (this test may very rarely fail, but demonstrates the intent).
1078        assert_ne!(name1, name2);
1079    }
1080
1081    #[test]
1082    fn sanitize_label_value_replaces_slashes() {
1083        let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1084        assert!(!sanitized.contains('/'));
1085        assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1086    }
1087
1088    #[test]
1089    fn sanitize_label_value_truncates_to_63_chars() {
1090        let long_value = "a".repeat(100);
1091        let sanitized = sanitize_label_value(&long_value);
1092        assert!(sanitized.len() <= 63);
1093    }
1094
1095    #[test]
1096    fn render_redacted_sql_masks_passwords() {
1097        let changes = vec![
1098            pgroles_core::diff::Change::CreateRole {
1099                name: "app".to_string(),
1100                state: pgroles_core::model::RoleState {
1101                    login: true,
1102                    ..pgroles_core::model::RoleState::default()
1103                },
1104            },
1105            pgroles_core::diff::Change::SetPassword {
1106                name: "app".to_string(),
1107                password: "super_secret".to_string(),
1108            },
1109        ];
1110        let ctx = pgroles_core::sql::SqlContext::default();
1111        let redacted = render_redacted_sql(&changes, &ctx);
1112
1113        assert!(redacted.contains("[REDACTED]"));
1114        assert!(!redacted.contains("super_secret"));
1115        assert!(redacted.contains("CREATE ROLE"));
1116    }
1117
1118    #[test]
1119    fn render_full_sql_includes_passwords() {
1120        let changes = vec![pgroles_core::diff::Change::SetPassword {
1121            name: "app".to_string(),
1122            password: "super_secret".to_string(),
1123        }];
1124        let ctx = pgroles_core::sql::SqlContext::default();
1125        let full = render_full_sql(&changes, &ctx);
1126
1127        assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1128    }
1129
1130    #[test]
1131    fn now_epoch_secs_returns_plausible_value() {
1132        let now = now_epoch_secs();
1133        // Should be after 2025-01-01 and before 2100-01-01.
1134        let y2025 = 1_735_689_600_i64;
1135        let y2100 = 4_102_444_800_i64;
1136        assert!(
1137            now > y2025 && now < y2100,
1138            "epoch secs {now} should be between 2025 and 2100"
1139        );
1140    }
1141}