Skip to main content

pgroles_operator/
plan.rs

1//! Plan lifecycle management for `PostgresPolicyPlan` resources.
2//!
3//! Handles creating, deduplicating, approving, executing, and cleaning up
4//! reconciliation plans. Plans represent computed SQL change sets that may
5//! require explicit approval before execution against a database.
6
7use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17    ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18    PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19    PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20    PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24/// Result of plan creation — distinguishes genuinely new plans from
25/// deduplication hits so callers can decide whether to emit events.
26#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28    /// A new plan was created with the given name.
29    Created(String),
30    /// An existing plan with the same hash was found (deduplication).
31    Deduplicated(String),
32}
33
34impl PlanCreationResult {
35    /// Return the plan name regardless of variant.
36    pub fn plan_name(&self) -> &str {
37        match self {
38            PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39        }
40    }
41
42    /// True when a new plan was actually created.
43    pub fn is_created(&self) -> bool {
44        matches!(self, PlanCreationResult::Created(_))
45    }
46}
47
48/// Maximum inline SQL size in plan status before spilling to a ConfigMap.
49const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51/// ConfigMap data key for the SQL content.
52const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54/// Default maximum number of historical plans to retain per policy.
55const DEFAULT_MAX_PLANS: usize = 10;
56
57/// How recently a Failed plan must have been created (in seconds) for the
58/// dedup check to consider it a match. Plans older than this are ignored so
59/// that retries after the user fixes the environment are not blocked.
60const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62// ---------------------------------------------------------------------------
63// Plan approval check
64// ---------------------------------------------------------------------------
65
66/// Result of checking a plan's approval annotations.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69    Pending,
70    Approved,
71    Rejected,
72}
73
74/// Check the approval state of a plan by inspecting its annotations.
75///
76/// Rejection takes priority over approval: if both annotations are set,
77/// the plan is considered rejected.
78pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79    let annotations = plan.metadata.annotations.as_ref();
80
81    let rejected = annotations
82        .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83        .map(|v| v == "true")
84        .unwrap_or(false);
85
86    if rejected {
87        return PlanApprovalState::Rejected;
88    }
89
90    let approved = annotations
91        .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92        .map(|v| v == "true")
93        .unwrap_or(false);
94
95    if approved {
96        return PlanApprovalState::Approved;
97    }
98
99    PlanApprovalState::Pending
100}
101
102// ---------------------------------------------------------------------------
103// Plan creation
104// ---------------------------------------------------------------------------
105
106/// Create or deduplicate a `PostgresPolicyPlan` for the given policy and changes.
107///
108/// Returns the name of the plan resource (either existing or newly created).
109///
110/// This function:
111/// 1. Renders the full executable SQL from the changes
112/// 2. Computes SHA-256 of the full SQL (before any redaction/truncation)
113/// 3. Checks for an existing Pending plan with the same hash (dedup)
114/// 4. Marks any existing Pending plan with a different hash as Superseded
115/// 5. Creates the new plan resource with ownerReferences
116/// 6. Creates a ConfigMap for large SQL, or stores inline
117/// 7. Updates the plan status
118#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120    client: &Client,
121    policy: &PostgresPolicy,
122    changes: &[pgroles_core::diff::Change],
123    sql_context: &pgroles_core::sql::SqlContext,
124    inspect_config: &pgroles_inspect::InspectConfig,
125    reconciliation_mode: CrdReconciliationMode,
126    database_identity: &str,
127    change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130    let policy_name = policy.name_any();
131    let generation = policy.metadata.generation.unwrap_or(0);
132
133    // 1. Render the full executable SQL (not redacted).
134    let full_sql = render_full_sql(changes, sql_context);
135
136    // 2. Compute SHA-256 hash of the full SQL.
137    let sql_hash = compute_sql_hash(&full_sql);
138
139    // 3. Count SQL statements (after wildcard expansion).
140    let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
141
142    // 4. Render redacted SQL for display (passwords masked).
143    let redacted_sql = render_redacted_sql(changes, sql_context);
144
145    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
146
147    // 4. List existing plans for this policy.
148    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
149    let existing_plans = plans_api
150        .list(&ListParams::default().labels(&label_selector))
151        .await?;
152
153    // 5. Check for duplicate pending plan with same hash.
154    for plan in &existing_plans {
155        if let Some(ref status) = plan.status
156            && status.phase == PlanPhase::Pending
157            && status.sql_hash.as_deref() == Some(&sql_hash)
158        {
159            // Identical plan already exists — return early (deduplicated).
160            let plan_name = plan.name_any();
161            info!(
162                plan = %plan_name,
163                policy = %policy_name,
164                "existing pending plan has identical SQL hash, skipping creation"
165            );
166            return Ok(PlanCreationResult::Deduplicated(plan_name));
167        }
168    }
169
170    // 5b. Check for recently-failed plan with the same hash. If a plan with
171    // this exact SQL already failed within the dedup window, creating another
172    // identical one is pointless — it would produce the same error. The window
173    // ensures we don't block retries after the user fixes the environment.
174    //
175    // Uses `status.failed_at` (not `creation_timestamp`) so that plans which
176    // waited for approval before failing are measured from the failure time.
177    let now_ts = now_epoch_secs();
178    for plan in &existing_plans {
179        if let Some(ref status) = plan.status
180            && status.phase == PlanPhase::Failed
181            && status.sql_hash.as_deref() == Some(&sql_hash)
182        {
183            let failed_ts = status
184                .failed_at
185                .as_deref()
186                .and_then(parse_rfc3339_epoch_secs)
187                .unwrap_or(0);
188            if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
189                let plan_name = plan.name_any();
190                info!(
191                    plan = %plan_name,
192                    policy = %policy_name,
193                    age_secs = now_ts - failed_ts,
194                    "recently-failed plan has identical SQL hash, skipping creation"
195                );
196                return Ok(PlanCreationResult::Deduplicated(plan_name));
197            }
198        }
199    }
200
201    // 6. Mark any existing Pending plans as Superseded.
202    for plan in &existing_plans {
203        if let Some(ref status) = plan.status
204            && status.phase == PlanPhase::Pending
205        {
206            let plan_name = plan.name_any();
207            info!(
208                plan = %plan_name,
209                policy = %policy_name,
210                "marking existing pending plan as Superseded"
211            );
212            let superseded_status = PostgresPolicyPlanStatus {
213                phase: PlanPhase::Superseded,
214                ..status.clone()
215            };
216            let patch = serde_json::json!({ "status": superseded_status });
217            plans_api
218                .patch_status(
219                    &plan_name,
220                    &PatchParams::apply("pgroles-operator"),
221                    &Patch::Merge(&patch),
222                )
223                .await?;
224        }
225    }
226
227    // 7. Generate a unique plan name using a timestamp.
228    let plan_name = generate_plan_name(&policy_name);
229
230    // 8. Build ownerReference pointing to the parent policy.
231    let owner_ref = build_owner_reference(policy);
232
233    // 9. Create the plan resource.
234    let plan = PostgresPolicyPlan::new(
235        &plan_name,
236        PostgresPolicyPlanSpec {
237            policy_ref: PolicyPlanRef {
238                name: policy_name.clone(),
239            },
240            policy_generation: generation,
241            reconciliation_mode,
242            owned_roles: inspect_config.managed_roles.clone(),
243            owned_schemas: inspect_config.managed_schemas.clone(),
244            managed_database_identity: database_identity.to_string(),
245        },
246    );
247    let mut plan = plan;
248    plan.metadata.namespace = Some(namespace.clone());
249    plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
250    plan.metadata.labels = Some(BTreeMap::from([
251        (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
252        (
253            LABEL_DATABASE_IDENTITY.to_string(),
254            sanitize_label_value(database_identity),
255        ),
256    ]));
257
258    // Annotations for quick visibility in kubectl describe / Lens.
259    let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
260    let summary_text = format!(
261        "{}R {}G {}D {}DP {}M",
262        change_summary.roles_created + change_summary.roles_altered,
263        change_summary.grants_added,
264        change_summary.default_privileges_set,
265        change_summary.roles_dropped,
266        change_summary.members_added,
267    );
268    plan.metadata.annotations = Some(BTreeMap::from([
269        ("pgroles.io/sql-preview".to_string(), sql_preview),
270        ("pgroles.io/summary".to_string(), summary_text),
271        (
272            "pgroles.io/sql-hash".to_string(),
273            sql_hash[..12].to_string(),
274        ),
275    ]));
276
277    let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
278    let plan_name = created_plan.name_any();
279
280    // 10. Handle SQL storage: inline or ConfigMap.
281    let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
282        (Some(redacted_sql), None)
283    } else {
284        // Create a ConfigMap for the full redacted SQL.
285        let configmap_name = format!("{plan_name}-sql");
286        let configmap = ConfigMap {
287            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
288                name: Some(configmap_name.clone()),
289                namespace: Some(namespace.clone()),
290                owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
291                labels: Some(BTreeMap::from([(
292                    LABEL_POLICY.to_string(),
293                    sanitize_label_value(&policy_name),
294                )])),
295                ..Default::default()
296            },
297            data: Some(BTreeMap::from([(
298                SQL_CONFIGMAP_KEY.to_string(),
299                redacted_sql,
300            )])),
301            ..Default::default()
302        };
303
304        let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
305        configmaps_api
306            .create(&PostParams::default(), &configmap)
307            .await?;
308
309        (
310            None,
311            Some(SqlRef {
312                name: configmap_name,
313                key: SQL_CONFIGMAP_KEY.to_string(),
314            }),
315        )
316    };
317
318    // 11. Update plan status.
319    let plan_status = PostgresPolicyPlanStatus {
320        phase: PlanPhase::Pending,
321        conditions: vec![
322            PolicyCondition {
323                condition_type: "Computed".to_string(),
324                status: "True".to_string(),
325                reason: Some("PlanComputed".to_string()),
326                message: Some(format!(
327                    "Plan computed with {} change(s)",
328                    change_summary.total
329                )),
330                last_transition_time: Some(crate::crd::now_rfc3339()),
331            },
332            PolicyCondition {
333                condition_type: "Approved".to_string(),
334                status: "False".to_string(),
335                reason: Some("PendingApproval".to_string()),
336                message: Some("Plan awaiting approval".to_string()),
337                last_transition_time: Some(crate::crd::now_rfc3339()),
338            },
339        ],
340        change_summary: Some(change_summary.clone()),
341        sql_ref,
342        sql_inline,
343        computed_at: Some(crate::crd::now_rfc3339()),
344        applied_at: None,
345        last_error: None,
346        sql_hash: Some(sql_hash),
347        applying_since: None,
348        failed_at: None,
349        sql_statements: Some(sql_statement_count),
350    };
351
352    let status_patch = serde_json::json!({ "status": plan_status });
353    plans_api
354        .patch_status(
355            &plan_name,
356            &PatchParams::apply("pgroles-operator"),
357            &Patch::Merge(&status_patch),
358        )
359        .await?;
360
361    info!(
362        plan = %plan_name,
363        policy = %policy_name,
364        changes = change_summary.total,
365        "created new plan"
366    );
367
368    Ok(PlanCreationResult::Created(plan_name))
369}
370
371// ---------------------------------------------------------------------------
372// Plan execution
373// ---------------------------------------------------------------------------
374
375/// Execute an approved plan against the database.
376///
377/// Reads SQL from inline status or the referenced ConfigMap, executes it in
378/// a transaction, and updates the plan status to Applied or Failed.
379pub async fn execute_plan(
380    client: &Client,
381    plan: &PostgresPolicyPlan,
382    pool: &sqlx::PgPool,
383    sql_context: &pgroles_core::sql::SqlContext,
384    changes: &[pgroles_core::diff::Change],
385) -> Result<(), ReconcileError> {
386    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
387    let plan_name = plan.name_any();
388    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
389
390    // Update phase to Applying.
391    update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
392
393    // Execute the SQL in a transaction using the original changes (not stored SQL).
394    // This ensures we use the actual executable SQL including real passwords,
395    // not the redacted version stored in the plan.
396    let result = execute_changes_in_transaction(pool, changes, sql_context).await;
397
398    match result {
399        Ok(statements_executed) => {
400            // Update plan status to Applied.
401            let mut applied_status = plan.status.clone().unwrap_or_default();
402            applied_status.phase = PlanPhase::Applied;
403            applied_status.applied_at = Some(crate::crd::now_rfc3339());
404            applied_status.last_error = None;
405            set_plan_condition(
406                &mut applied_status.conditions,
407                "Approved",
408                "True",
409                "Approved",
410                "Plan approved and executed",
411            );
412
413            let patch = serde_json::json!({ "status": applied_status });
414            plans_api
415                .patch_status(
416                    &plan_name,
417                    &PatchParams::apply("pgroles-operator"),
418                    &Patch::Merge(&patch),
419                )
420                .await?;
421
422            info!(
423                plan = %plan_name,
424                statements = statements_executed,
425                "plan executed successfully"
426            );
427            Ok(())
428        }
429        Err(err) => {
430            // Update plan status to Failed.
431            let error_message = err.to_string();
432            let mut failed_status = plan.status.clone().unwrap_or_default();
433            failed_status.phase = PlanPhase::Failed;
434            failed_status.last_error = Some(error_message);
435            failed_status.failed_at = Some(crate::crd::now_rfc3339());
436
437            let patch = serde_json::json!({ "status": failed_status });
438            if let Err(status_err) = plans_api
439                .patch_status(
440                    &plan_name,
441                    &PatchParams::apply("pgroles-operator"),
442                    &Patch::Merge(&patch),
443                )
444                .await
445            {
446                tracing::warn!(
447                    plan = %plan_name,
448                    %status_err,
449                    "failed to update plan status to Failed"
450                );
451            }
452
453            Err(err)
454        }
455    }
456}
457
458/// Execute SQL changes in a database transaction.
459///
460/// Returns the number of statements executed on success.
461async fn execute_changes_in_transaction(
462    pool: &sqlx::PgPool,
463    changes: &[pgroles_core::diff::Change],
464    sql_context: &pgroles_core::sql::SqlContext,
465) -> Result<usize, ReconcileError> {
466    let mut transaction = pool.begin().await?;
467    let mut statements_executed = 0usize;
468
469    for change in changes {
470        let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
471        for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
472            if is_sensitive {
473                tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
474            } else {
475                tracing::debug!(%sql, "executing");
476            }
477            sqlx::query(&sql).execute(transaction.as_mut()).await?;
478            statements_executed += 1;
479        }
480    }
481
482    transaction.commit().await?;
483    Ok(statements_executed)
484}
485
486// ---------------------------------------------------------------------------
487// Plan cleanup / retention
488// ---------------------------------------------------------------------------
489
490/// Clean up old plans for a policy, retaining at most `max_plans` terminal plans.
491///
492/// Terminal plans are those in Applied, Failed, Superseded, or Rejected phase.
493/// Pending and Approved plans are never cleaned up by this function.
494pub async fn cleanup_old_plans(
495    client: &Client,
496    policy: &PostgresPolicy,
497    max_plans: Option<usize>,
498) -> Result<(), ReconcileError> {
499    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
500    let policy_name = policy.name_any();
501    let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
502
503    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
504    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
505    let existing_plans = plans_api
506        .list(&ListParams::default().labels(&label_selector))
507        .await?;
508
509    // Collect terminal plans sorted by creation timestamp (oldest first).
510    let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
511        .iter()
512        .filter(|plan| {
513            plan.status
514                .as_ref()
515                .map(|s| {
516                    matches!(
517                        s.phase,
518                        PlanPhase::Applied
519                            | PlanPhase::Failed
520                            | PlanPhase::Superseded
521                            | PlanPhase::Rejected
522                    )
523                })
524                .unwrap_or(false)
525        })
526        .collect();
527
528    if terminal_plans.len() <= max_plans {
529        return Ok(());
530    }
531
532    // Sort by creation timestamp ascending (oldest first).
533    terminal_plans.sort_by(|a, b| {
534        let a_time = a.metadata.creation_timestamp.as_ref();
535        let b_time = b.metadata.creation_timestamp.as_ref();
536        a_time.cmp(&b_time)
537    });
538
539    let plans_to_delete = terminal_plans.len() - max_plans;
540    for plan in terminal_plans.into_iter().take(plans_to_delete) {
541        let plan_name = plan.name_any();
542        info!(
543            plan = %plan_name,
544            policy = %policy_name,
545            "cleaning up old plan"
546        );
547        if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
548            tracing::warn!(
549                plan = %plan_name,
550                %err,
551                "failed to delete old plan during cleanup"
552            );
553        }
554    }
555
556    Ok(())
557}
558
559// ---------------------------------------------------------------------------
560// Helpers
561// ---------------------------------------------------------------------------
562
563/// Render the full executable SQL from changes (including real passwords).
564pub(crate) fn render_full_sql(
565    changes: &[pgroles_core::diff::Change],
566    sql_context: &pgroles_core::sql::SqlContext,
567) -> String {
568    changes
569        .iter()
570        .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
571        .collect::<Vec<_>>()
572        .join("\n")
573}
574
575/// Render redacted SQL for display (passwords replaced with [REDACTED]).
576fn render_redacted_sql(
577    changes: &[pgroles_core::diff::Change],
578    sql_context: &pgroles_core::sql::SqlContext,
579) -> String {
580    changes
581        .iter()
582        .flat_map(|change| {
583            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
584                vec![format!(
585                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
586                    pgroles_core::sql::quote_ident(name)
587                )]
588            } else {
589                pgroles_core::sql::render_statements_with_context(change, sql_context)
590            }
591        })
592        .collect::<Vec<_>>()
593        .join("\n")
594}
595
596/// Compute SHA-256 hash of the SQL string as a hex digest.
597pub(crate) fn compute_sql_hash(sql: &str) -> String {
598    let mut hasher = Sha256::new();
599    hasher.update(sql.as_bytes());
600    format!("{:x}", hasher.finalize())
601}
602
603/// Generate a plan name from policy name and current timestamp.
604///
605/// Format: `{policy-name}-plan-{YYYYMMDD-HHMMSS}-{millis}{random}`
606///
607/// A millisecond and random suffix is appended to avoid collisions when the
608/// operator retries within the same second.
609fn generate_plan_name(policy_name: &str) -> String {
610    let now = std::time::SystemTime::now()
611        .duration_since(std::time::UNIX_EPOCH)
612        .unwrap_or_default();
613    let timestamp = format_timestamp_compact();
614    let millis = now.subsec_millis();
615    let random_suffix: u32 = rand::random::<u32>() % 1000;
616    let suffix = format!("{millis:03}{random_suffix:03}");
617    // Kubernetes names must be <= 253 chars and DNS-compatible.
618    // Reserve 4 chars for the potential "-sql" ConfigMap suffix.
619    let max_name_len = 253 - 4; // 249
620    let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
621    let prefix = if policy_name.len() > max_prefix_len {
622        &policy_name[..max_prefix_len]
623    } else {
624        policy_name
625    };
626    format!("{prefix}-plan-{timestamp}-{suffix}")
627}
628
629/// Format the current UTC time as `YYYYMMDD-HHMMSS`.
630fn format_timestamp_compact() -> String {
631    use std::time::SystemTime;
632    let now = SystemTime::now()
633        .duration_since(SystemTime::UNIX_EPOCH)
634        .unwrap_or_default();
635    let secs = now.as_secs();
636    let (year, month, day) = crate::crd::days_to_date(secs / 86400);
637    let remaining = secs % 86400;
638    let hours = remaining / 3600;
639    let minutes = (remaining % 3600) / 60;
640    let seconds = remaining % 60;
641    format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
642}
643
644/// Sanitize a string for use as a Kubernetes label value.
645///
646/// Label values must be <= 63 chars and match `[a-z0-9A-Z._-]*`.
647fn sanitize_label_value(value: &str) -> String {
648    let sanitized: String = value
649        .chars()
650        .map(|c| {
651            if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
652                c
653            } else {
654                '_'
655            }
656        })
657        .take(63)
658        .collect();
659    sanitized
660}
661
662/// Current time as Unix epoch seconds (for dedup window checks).
663fn now_epoch_secs() -> i64 {
664    std::time::SystemTime::now()
665        .duration_since(std::time::UNIX_EPOCH)
666        .unwrap_or_default()
667        .as_secs() as i64
668}
669
670/// Parse an RFC 3339 timestamp string to Unix epoch seconds.
671/// Returns `None` if parsing fails.
672fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
673    // Use jiff (already a transitive dep via k8s-openapi) for RFC 3339 parsing.
674    rfc3339
675        .parse::<jiff::Timestamp>()
676        .ok()
677        .map(|t| t.as_second())
678}
679
680/// Build an OwnerReference pointing from a plan to its parent policy.
681fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
682    OwnerReference {
683        api_version: PostgresPolicy::api_version(&()).to_string(),
684        kind: PostgresPolicy::kind(&()).to_string(),
685        name: policy.name_any(),
686        uid: policy.metadata.uid.clone().unwrap_or_default(),
687        controller: Some(true),
688        block_owner_deletion: Some(true),
689    }
690}
691
692/// Build an OwnerReference pointing from a ConfigMap to its parent plan.
693fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
694    OwnerReference {
695        api_version: PostgresPolicyPlan::api_version(&()).to_string(),
696        kind: PostgresPolicyPlan::kind(&()).to_string(),
697        name: plan.name_any(),
698        uid: plan.metadata.uid.clone().unwrap_or_default(),
699        controller: Some(true),
700        block_owner_deletion: Some(true),
701    }
702}
703
704/// Update the phase field on a plan's status.
705///
706/// When transitioning to `Applying`, also sets `applying_since` for stuck
707/// plan detection.
708async fn update_plan_phase(
709    plans_api: &Api<PostgresPolicyPlan>,
710    plan_name: &str,
711    phase: PlanPhase,
712) -> Result<(), ReconcileError> {
713    let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
714    if phase == PlanPhase::Applying {
715        patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
716    }
717    plans_api
718        .patch_status(
719            plan_name,
720            &PatchParams::apply("pgroles-operator"),
721            &Patch::Merge(&patch_value),
722        )
723        .await?;
724    Ok(())
725}
726
727/// Set or update a condition in a conditions list.
728///
729/// Preserves `last_transition_time` when the status value is unchanged
730/// (only reason/message changed), matching Kubernetes condition conventions.
731fn set_plan_condition(
732    conditions: &mut Vec<PolicyCondition>,
733    condition_type: &str,
734    status: &str,
735    reason: &str,
736    message: &str,
737) {
738    let transition_time = if let Some(existing) = conditions
739        .iter()
740        .find(|c| c.condition_type == condition_type)
741    {
742        if existing.status == status {
743            existing.last_transition_time.clone()
744        } else {
745            Some(crate::crd::now_rfc3339())
746        }
747    } else {
748        Some(crate::crd::now_rfc3339())
749    };
750
751    let condition = PolicyCondition {
752        condition_type: condition_type.to_string(),
753        status: status.to_string(),
754        reason: Some(reason.to_string()),
755        message: Some(message.to_string()),
756        last_transition_time: transition_time,
757    };
758    if let Some(existing) = conditions
759        .iter_mut()
760        .find(|c| c.condition_type == condition_type)
761    {
762        *existing = condition;
763    } else {
764        conditions.push(condition);
765    }
766}
767
768/// Update the parent policy's `current_plan_ref` in status.
769pub async fn update_policy_plan_ref(
770    client: &Client,
771    policy: &PostgresPolicy,
772    plan_name: &str,
773) -> Result<(), ReconcileError> {
774    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
775    let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
776
777    let patch = serde_json::json!({
778        "status": {
779            "current_plan_ref": PlanReference {
780                name: plan_name.to_string(),
781            }
782        }
783    });
784
785    policy_api
786        .patch_status(
787            &policy.name_any(),
788            &PatchParams::apply("pgroles-operator"),
789            &Patch::Merge(&patch),
790        )
791        .await?;
792
793    Ok(())
794}
795
796/// Look up the current actionable plan for a policy, if any.
797///
798/// An actionable plan is one in `Pending` or `Approved` phase — i.e. a plan
799/// that the reconciler should evaluate for approval/execution.
800pub async fn get_current_actionable_plan(
801    client: &Client,
802    policy: &PostgresPolicy,
803) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
804    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
805    let policy_name = policy.name_any();
806
807    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
808    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
809    let existing_plans = plans_api
810        .list(&ListParams::default().labels(&label_selector))
811        .await?;
812
813    // Find the most recent actionable plan (Pending or Approved, by creation time).
814    let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
815        .into_iter()
816        .filter(|plan| {
817            plan.status
818                .as_ref()
819                .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
820                .unwrap_or(false)
821        })
822        .collect();
823
824    pending_plans.sort_by(|a, b| {
825        let a_time = a.metadata.creation_timestamp.as_ref();
826        let b_time = b.metadata.creation_timestamp.as_ref();
827        b_time.cmp(&a_time) // newest first
828    });
829
830    Ok(pending_plans.into_iter().next())
831}
832
833/// Look up the most recent plan for a policy in a given phase.
834pub async fn get_plan_by_phase(
835    client: &Client,
836    policy: &PostgresPolicy,
837    target_phase: PlanPhase,
838) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
839    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
840    let policy_name = policy.name_any();
841
842    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
843    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
844    let existing_plans = plans_api
845        .list(&ListParams::default().labels(&label_selector))
846        .await?;
847
848    let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
849        .into_iter()
850        .filter(|plan| {
851            plan.status
852                .as_ref()
853                .map(|s| s.phase == target_phase)
854                .unwrap_or(false)
855        })
856        .collect();
857
858    matching_plans.sort_by(|a, b| {
859        let a_time = a.metadata.creation_timestamp.as_ref();
860        let b_time = b.metadata.creation_timestamp.as_ref();
861        b_time.cmp(&a_time) // newest first
862    });
863
864    Ok(matching_plans.into_iter().next())
865}
866
867/// Mark a plan as Failed with a given error message.
868pub async fn mark_plan_failed(
869    client: &Client,
870    plan: &PostgresPolicyPlan,
871    error_message: &str,
872) -> Result<(), ReconcileError> {
873    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
874    let plan_name = plan.name_any();
875    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
876
877    let mut status = plan.status.clone().unwrap_or_default();
878    status.phase = PlanPhase::Failed;
879    status.last_error = Some(error_message.to_string());
880    status.failed_at = Some(crate::crd::now_rfc3339());
881
882    let patch = serde_json::json!({ "status": status });
883    plans_api
884        .patch_status(
885            &plan_name,
886            &PatchParams::apply("pgroles-operator"),
887            &Patch::Merge(&patch),
888        )
889        .await?;
890
891    info!(
892        plan = %plan_name,
893        "marked stuck Applying plan as Failed"
894    );
895
896    Ok(())
897}
898
899/// Mark a plan as Approved.
900///
901/// Callers provide `reason` and `message` to distinguish auto-approval from
902/// manual approval in the plan's conditions.
903pub async fn mark_plan_approved(
904    client: &Client,
905    plan: &PostgresPolicyPlan,
906    reason: &str,
907    message: &str,
908) -> Result<(), ReconcileError> {
909    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
910    let plan_name = plan.name_any();
911    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
912
913    let mut status = plan.status.clone().unwrap_or_default();
914    status.phase = PlanPhase::Approved;
915    set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
916
917    let patch = serde_json::json!({ "status": status });
918    plans_api
919        .patch_status(
920            &plan_name,
921            &PatchParams::apply("pgroles-operator"),
922            &Patch::Merge(&patch),
923        )
924        .await?;
925
926    Ok(())
927}
928
929/// Mark a plan as Rejected.
930pub async fn mark_plan_rejected(
931    client: &Client,
932    plan: &PostgresPolicyPlan,
933) -> Result<(), ReconcileError> {
934    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
935    let plan_name = plan.name_any();
936    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
937
938    let mut status = plan.status.clone().unwrap_or_default();
939    status.phase = PlanPhase::Rejected;
940    set_plan_condition(
941        &mut status.conditions,
942        "Approved",
943        "False",
944        "Rejected",
945        "Plan rejected via annotation",
946    );
947
948    let patch = serde_json::json!({ "status": status });
949    plans_api
950        .patch_status(
951            &plan_name,
952            &PatchParams::apply("pgroles-operator"),
953            &Patch::Merge(&patch),
954        )
955        .await?;
956
957    Ok(())
958}
959
960/// Mark a plan as Superseded (database state changed since approval).
961pub async fn mark_plan_superseded(
962    client: &Client,
963    plan: &PostgresPolicyPlan,
964) -> Result<(), ReconcileError> {
965    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
966    let plan_name = plan.name_any();
967    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
968
969    let mut status = plan.status.clone().unwrap_or_default();
970    status.phase = PlanPhase::Superseded;
971    set_plan_condition(
972        &mut status.conditions,
973        "Approved",
974        "False",
975        "Superseded",
976        "Database state changed since plan was approved",
977    );
978
979    let patch = serde_json::json!({ "status": status });
980    plans_api
981        .patch_status(
982            &plan_name,
983            &PatchParams::apply("pgroles-operator"),
984            &Patch::Merge(&patch),
985        )
986        .await?;
987
988    Ok(())
989}
990
991// ---------------------------------------------------------------------------
992// Tests
993// ---------------------------------------------------------------------------
994
995#[cfg(test)]
996mod tests {
997    use super::*;
998    use crate::crd::CrdReconciliationMode;
999
1000    fn test_plan(
1001        name: &str,
1002        phase: PlanPhase,
1003        annotations: Option<BTreeMap<String, String>>,
1004    ) -> PostgresPolicyPlan {
1005        let mut plan = PostgresPolicyPlan::new(
1006            name,
1007            PostgresPolicyPlanSpec {
1008                policy_ref: PolicyPlanRef {
1009                    name: "test-policy".to_string(),
1010                },
1011                policy_generation: 1,
1012                reconciliation_mode: CrdReconciliationMode::Authoritative,
1013                owned_roles: vec!["role-a".to_string()],
1014                owned_schemas: vec!["public".to_string()],
1015                managed_database_identity: "default/db/DATABASE_URL".to_string(),
1016            },
1017        );
1018        plan.metadata.namespace = Some("default".to_string());
1019        plan.metadata.annotations = annotations;
1020        plan.status = Some(PostgresPolicyPlanStatus {
1021            phase,
1022            ..Default::default()
1023        });
1024        plan
1025    }
1026
1027    #[test]
1028    fn check_plan_approval_pending_when_no_annotations() {
1029        let plan = test_plan("plan-1", PlanPhase::Pending, None);
1030        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1031    }
1032
1033    #[test]
1034    fn check_plan_approval_approved_with_annotation() {
1035        let annotations =
1036            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1037        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1038        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1039    }
1040
1041    #[test]
1042    fn check_plan_approval_rejected_with_annotation() {
1043        let annotations =
1044            BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1045        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1046        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1047    }
1048
1049    #[test]
1050    fn check_plan_approval_rejected_wins_over_approved() {
1051        let annotations = BTreeMap::from([
1052            (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1053            (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1054        ]);
1055        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1056        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1057    }
1058
1059    #[test]
1060    fn check_plan_approval_non_true_value_is_pending() {
1061        let annotations =
1062            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1063        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1064        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1065    }
1066
1067    #[test]
1068    fn compute_sql_hash_is_deterministic() {
1069        let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1070        let hash1 = compute_sql_hash(sql);
1071        let hash2 = compute_sql_hash(sql);
1072        assert_eq!(hash1, hash2);
1073        assert_eq!(hash1.len(), 64); // SHA-256 hex digest is 64 chars
1074    }
1075
1076    #[test]
1077    fn compute_sql_hash_differs_for_different_sql() {
1078        let hash1 = compute_sql_hash("CREATE ROLE a;");
1079        let hash2 = compute_sql_hash("CREATE ROLE b;");
1080        assert_ne!(hash1, hash2);
1081    }
1082
1083    #[test]
1084    fn generate_plan_name_has_expected_format() {
1085        let name = generate_plan_name("my-policy");
1086        assert!(name.starts_with("my-policy-plan-"));
1087        // Should be "my-policy-plan-YYYYMMDD-HHMMSS-MMMRRR"
1088        let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1089        // YYYYMMDD-HHMMSS-MMMRRR = 15 + 1 + 6 = 22 chars
1090        assert_eq!(suffix.len(), 22);
1091        assert_eq!(&suffix[8..9], "-");
1092        assert_eq!(&suffix[15..16], "-");
1093    }
1094
1095    #[test]
1096    fn generate_plan_name_is_unique_across_calls() {
1097        let name1 = generate_plan_name("my-policy");
1098        let name2 = generate_plan_name("my-policy");
1099        // With millisecond + random suffix, collisions are extremely unlikely
1100        // (this test may very rarely fail, but demonstrates the intent).
1101        assert_ne!(name1, name2);
1102    }
1103
1104    #[test]
1105    fn sanitize_label_value_replaces_slashes() {
1106        let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1107        assert!(!sanitized.contains('/'));
1108        assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1109    }
1110
1111    #[test]
1112    fn sanitize_label_value_truncates_to_63_chars() {
1113        let long_value = "a".repeat(100);
1114        let sanitized = sanitize_label_value(&long_value);
1115        assert!(sanitized.len() <= 63);
1116    }
1117
1118    #[test]
1119    fn render_redacted_sql_masks_passwords() {
1120        let changes = vec![
1121            pgroles_core::diff::Change::CreateRole {
1122                name: "app".to_string(),
1123                state: pgroles_core::model::RoleState {
1124                    login: true,
1125                    ..pgroles_core::model::RoleState::default()
1126                },
1127            },
1128            pgroles_core::diff::Change::SetPassword {
1129                name: "app".to_string(),
1130                password: "super_secret".to_string(),
1131            },
1132        ];
1133        let ctx = pgroles_core::sql::SqlContext::default();
1134        let redacted = render_redacted_sql(&changes, &ctx);
1135
1136        assert!(redacted.contains("[REDACTED]"));
1137        assert!(!redacted.contains("super_secret"));
1138        assert!(redacted.contains("CREATE ROLE"));
1139    }
1140
1141    #[test]
1142    fn render_full_sql_includes_passwords() {
1143        let changes = vec![pgroles_core::diff::Change::SetPassword {
1144            name: "app".to_string(),
1145            password: "super_secret".to_string(),
1146        }];
1147        let ctx = pgroles_core::sql::SqlContext::default();
1148        let full = render_full_sql(&changes, &ctx);
1149
1150        assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1151    }
1152
1153    #[test]
1154    fn now_epoch_secs_returns_plausible_value() {
1155        let now = now_epoch_secs();
1156        // Should be after 2025-01-01 and before 2100-01-01.
1157        let y2025 = 1_735_689_600_i64;
1158        let y2100 = 4_102_444_800_i64;
1159        assert!(
1160            now > y2025 && now < y2100,
1161            "epoch secs {now} should be between 2025 and 2100"
1162        );
1163    }
1164}