Skip to main content

pgroles_operator/
plan.rs

1//! Plan lifecycle management for `PostgresPolicyPlan` resources.
2//!
3//! Handles creating, deduplicating, approving, executing, and cleaning up
4//! reconciliation plans. Plans represent computed SQL change sets that may
5//! require explicit approval before execution against a database.
6
7use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17    ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18    PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19    PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20    PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24/// Result of plan creation — distinguishes genuinely new plans from
25/// deduplication hits so callers can decide whether to emit events.
26#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28    /// A new plan was created with the given name.
29    Created(String),
30    /// An existing plan with the same hash was found (deduplication).
31    Deduplicated(String),
32}
33
34impl PlanCreationResult {
35    /// Return the plan name regardless of variant.
36    pub fn plan_name(&self) -> &str {
37        match self {
38            PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39        }
40    }
41
42    /// True when a new plan was actually created.
43    pub fn is_created(&self) -> bool {
44        matches!(self, PlanCreationResult::Created(_))
45    }
46}
47
48/// Maximum inline SQL size in plan status before spilling to a ConfigMap.
49const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51/// ConfigMap data key for the SQL content.
52const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54/// Default maximum number of historical plans to retain per policy.
55const DEFAULT_MAX_PLANS: usize = 10;
56
57/// How recently a Failed plan must have been created (in seconds) for the
58/// dedup check to consider it a match. Plans older than this are ignored so
59/// that retries after the user fixes the environment are not blocked.
60const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62// ---------------------------------------------------------------------------
63// Plan approval check
64// ---------------------------------------------------------------------------
65
66/// Result of checking a plan's approval annotations.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69    Pending,
70    Approved,
71    Rejected,
72}
73
74/// Check the approval state of a plan by inspecting its annotations.
75///
76/// Rejection takes priority over approval: if both annotations are set,
77/// the plan is considered rejected.
78pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79    let annotations = plan.metadata.annotations.as_ref();
80
81    let rejected = annotations
82        .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83        .map(|v| v == "true")
84        .unwrap_or(false);
85
86    if rejected {
87        return PlanApprovalState::Rejected;
88    }
89
90    let approved = annotations
91        .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92        .map(|v| v == "true")
93        .unwrap_or(false);
94
95    if approved {
96        return PlanApprovalState::Approved;
97    }
98
99    PlanApprovalState::Pending
100}
101
102// ---------------------------------------------------------------------------
103// Plan creation
104// ---------------------------------------------------------------------------
105
106/// Create or deduplicate a `PostgresPolicyPlan` for the given policy and changes.
107///
108/// Returns the name of the plan resource (either existing or newly created).
109///
110/// This function:
111/// 1. Renders the full executable SQL from the changes
112/// 2. Computes SHA-256 of the full SQL (before any redaction/truncation)
113/// 3. Checks for an existing Pending plan with the same hash (dedup)
114/// 4. Marks any existing Pending plan with a different hash as Superseded
115/// 5. Creates the new plan resource with ownerReferences
116/// 6. Creates a ConfigMap for large SQL, or stores inline
117/// 7. Updates the plan status
118#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120    client: &Client,
121    policy: &PostgresPolicy,
122    changes: &[pgroles_core::diff::Change],
123    sql_context: &pgroles_core::sql::SqlContext,
124    inspect_config: &pgroles_inspect::InspectConfig,
125    reconciliation_mode: CrdReconciliationMode,
126    database_identity: &str,
127    change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130    let policy_name = policy.name_any();
131    let generation = policy.metadata.generation.unwrap_or(0);
132
133    // 1. Render the full executable SQL (not redacted).
134    let full_sql = render_full_sql(changes, sql_context);
135
136    // 2. Compute SHA-256 hash of the full SQL.
137    let sql_hash = compute_sql_hash(&full_sql);
138
139    // 3. Count SQL statements (after wildcard expansion).
140    let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
141
142    // 4. Render redacted SQL for display (passwords masked).
143    let redacted_sql = render_redacted_sql(changes, sql_context);
144
145    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
146
147    // 4. List existing plans for this policy.
148    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
149    let existing_plans = plans_api
150        .list(&ListParams::default().labels(&label_selector))
151        .await?;
152
153    // 5. Check for duplicate pending plan with same hash.
154    for plan in &existing_plans {
155        if let Some(ref status) = plan.status
156            && status.phase == PlanPhase::Pending
157            && status.sql_hash.as_deref() == Some(&sql_hash)
158        {
159            // Identical plan already exists — return early (deduplicated).
160            let plan_name = plan.name_any();
161            info!(
162                plan = %plan_name,
163                policy = %policy_name,
164                "existing pending plan has identical SQL hash, skipping creation"
165            );
166            return Ok(PlanCreationResult::Deduplicated(plan_name));
167        }
168    }
169
170    // 5b. Check for recently-failed plan with the same hash. If a plan with
171    // this exact SQL already failed within the dedup window, creating another
172    // identical one is pointless — it would produce the same error. The window
173    // ensures we don't block retries after the user fixes the environment.
174    //
175    // Uses `status.failed_at` (not `creation_timestamp`) so that plans which
176    // waited for approval before failing are measured from the failure time.
177    let now_ts = now_epoch_secs();
178    for plan in &existing_plans {
179        if let Some(ref status) = plan.status
180            && status.phase == PlanPhase::Failed
181            && status.sql_hash.as_deref() == Some(&sql_hash)
182        {
183            let failed_ts = status
184                .failed_at
185                .as_deref()
186                .and_then(parse_rfc3339_epoch_secs)
187                .unwrap_or(0);
188            if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
189                let plan_name = plan.name_any();
190                info!(
191                    plan = %plan_name,
192                    policy = %policy_name,
193                    age_secs = now_ts - failed_ts,
194                    "recently-failed plan has identical SQL hash, skipping creation"
195                );
196                return Ok(PlanCreationResult::Deduplicated(plan_name));
197            }
198        }
199    }
200
201    // 6. Mark any existing Pending plans as Superseded.
202    for plan in &existing_plans {
203        if let Some(ref status) = plan.status
204            && status.phase == PlanPhase::Pending
205        {
206            let plan_name = plan.name_any();
207            info!(
208                plan = %plan_name,
209                policy = %policy_name,
210                "marking existing pending plan as Superseded"
211            );
212            let superseded_status = PostgresPolicyPlanStatus {
213                phase: PlanPhase::Superseded,
214                ..status.clone()
215            };
216            let patch = serde_json::json!({ "status": superseded_status });
217            plans_api
218                .patch_status(
219                    &plan_name,
220                    &PatchParams::apply("pgroles-operator"),
221                    &Patch::Merge(&patch),
222                )
223                .await?;
224        }
225    }
226
227    // 7. Generate a unique plan name using a timestamp.
228    let plan_name = generate_plan_name(&policy_name);
229
230    // 8. Build ownerReference pointing to the parent policy.
231    let owner_ref = build_owner_reference(policy);
232
233    // 9. Create the plan resource.
234    let plan = PostgresPolicyPlan::new(
235        &plan_name,
236        PostgresPolicyPlanSpec {
237            policy_ref: PolicyPlanRef {
238                name: policy_name.clone(),
239            },
240            policy_generation: generation,
241            reconciliation_mode,
242            owned_roles: inspect_config.managed_roles.clone(),
243            owned_schemas: inspect_config.managed_schemas.clone(),
244            managed_database_identity: database_identity.to_string(),
245        },
246    );
247    let mut plan = plan;
248    plan.metadata.namespace = Some(namespace.clone());
249    plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
250    plan.metadata.labels = Some(BTreeMap::from([
251        (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
252        (
253            LABEL_DATABASE_IDENTITY.to_string(),
254            sanitize_label_value(database_identity),
255        ),
256    ]));
257
258    // Annotations for quick visibility in kubectl describe / Lens.
259    let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
260    let summary_text = format!(
261        "{}R {}G {}D {}DP {}M",
262        change_summary.roles_created + change_summary.roles_altered,
263        change_summary.grants_added,
264        change_summary.default_privileges_set,
265        change_summary.roles_dropped,
266        change_summary.members_added,
267    );
268    plan.metadata.annotations = Some(BTreeMap::from([
269        ("pgroles.io/sql-preview".to_string(), sql_preview),
270        ("pgroles.io/summary".to_string(), summary_text),
271        (
272            "pgroles.io/sql-hash".to_string(),
273            sql_hash[..12].to_string(),
274        ),
275    ]));
276
277    let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
278    let plan_name = created_plan.name_any();
279
280    // 10. Handle SQL storage: inline or ConfigMap.
281    let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
282        (Some(redacted_sql), None)
283    } else {
284        // Create a ConfigMap for the full redacted SQL.
285        let configmap_name = format!("{plan_name}-sql");
286        let configmap = ConfigMap {
287            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
288                name: Some(configmap_name.clone()),
289                namespace: Some(namespace.clone()),
290                owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
291                labels: Some(BTreeMap::from([(
292                    LABEL_POLICY.to_string(),
293                    sanitize_label_value(&policy_name),
294                )])),
295                ..Default::default()
296            },
297            data: Some(BTreeMap::from([(
298                SQL_CONFIGMAP_KEY.to_string(),
299                redacted_sql,
300            )])),
301            ..Default::default()
302        };
303
304        let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
305        configmaps_api
306            .create(&PostParams::default(), &configmap)
307            .await?;
308
309        (
310            None,
311            Some(SqlRef {
312                name: configmap_name,
313                key: SQL_CONFIGMAP_KEY.to_string(),
314            }),
315        )
316    };
317
318    // 11. Update plan status.
319    let plan_status = PostgresPolicyPlanStatus {
320        phase: PlanPhase::Pending,
321        conditions: vec![
322            PolicyCondition {
323                condition_type: "Computed".to_string(),
324                status: "True".to_string(),
325                reason: Some("PlanComputed".to_string()),
326                message: Some(format!(
327                    "Plan computed with {} change(s)",
328                    change_summary.total
329                )),
330                last_transition_time: Some(crate::crd::now_rfc3339()),
331            },
332            PolicyCondition {
333                condition_type: "Approved".to_string(),
334                status: "False".to_string(),
335                reason: Some("PendingApproval".to_string()),
336                message: Some("Plan awaiting approval".to_string()),
337                last_transition_time: Some(crate::crd::now_rfc3339()),
338            },
339        ],
340        change_summary: Some(change_summary.clone()),
341        sql_ref,
342        sql_inline,
343        computed_at: Some(crate::crd::now_rfc3339()),
344        applied_at: None,
345        last_error: None,
346        sql_hash: Some(sql_hash),
347        applying_since: None,
348        failed_at: None,
349        sql_statements: Some(sql_statement_count),
350    };
351
352    let status_patch = serde_json::json!({ "status": plan_status });
353    plans_api
354        .patch_status(
355            &plan_name,
356            &PatchParams::apply("pgroles-operator"),
357            &Patch::Merge(&status_patch),
358        )
359        .await?;
360
361    info!(
362        plan = %plan_name,
363        policy = %policy_name,
364        changes = change_summary.total,
365        "created new plan"
366    );
367
368    Ok(PlanCreationResult::Created(plan_name))
369}
370
371// ---------------------------------------------------------------------------
372// Plan execution
373// ---------------------------------------------------------------------------
374
375/// Execute an approved plan against the database.
376///
377/// Reads SQL from inline status or the referenced ConfigMap, executes it in
378/// a transaction, and updates the plan status to Applied or Failed.
379pub async fn execute_plan(
380    client: &Client,
381    plan: &PostgresPolicyPlan,
382    pool: &sqlx::PgPool,
383    sql_context: &pgroles_core::sql::SqlContext,
384    changes: &[pgroles_core::diff::Change],
385) -> Result<(), ReconcileError> {
386    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
387    let plan_name = plan.name_any();
388    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
389
390    // Update phase to Applying.
391    update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
392
393    // Execute the SQL in a transaction using the original changes (not stored SQL).
394    // This ensures we use the actual executable SQL including real passwords,
395    // not the redacted version stored in the plan.
396    let result = execute_changes_in_transaction(pool, changes, sql_context).await;
397
398    match result {
399        Ok(statements_executed) => {
400            // Update plan status to Applied.
401            let mut applied_status = plan.status.clone().unwrap_or_default();
402            applied_status.phase = PlanPhase::Applied;
403            applied_status.applied_at = Some(crate::crd::now_rfc3339());
404            applied_status.last_error = None;
405            set_plan_condition(
406                &mut applied_status.conditions,
407                "Approved",
408                "True",
409                "Approved",
410                "Plan approved and executed",
411            );
412
413            let patch = serde_json::json!({ "status": applied_status });
414            plans_api
415                .patch_status(
416                    &plan_name,
417                    &PatchParams::apply("pgroles-operator"),
418                    &Patch::Merge(&patch),
419                )
420                .await?;
421
422            info!(
423                plan = %plan_name,
424                statements = statements_executed,
425                "plan executed successfully"
426            );
427            Ok(())
428        }
429        Err(err) => {
430            // Update plan status to Failed.
431            let error_message = err.to_string();
432            let mut failed_status = plan.status.clone().unwrap_or_default();
433            failed_status.phase = PlanPhase::Failed;
434            failed_status.last_error = Some(error_message);
435            failed_status.failed_at = Some(crate::crd::now_rfc3339());
436
437            let patch = serde_json::json!({ "status": failed_status });
438            if let Err(status_err) = plans_api
439                .patch_status(
440                    &plan_name,
441                    &PatchParams::apply("pgroles-operator"),
442                    &Patch::Merge(&patch),
443                )
444                .await
445            {
446                tracing::warn!(
447                    plan = %plan_name,
448                    %status_err,
449                    "failed to update plan status to Failed"
450                );
451            }
452
453            Err(err)
454        }
455    }
456}
457
458/// Execute SQL changes in a database transaction.
459///
460/// Returns the number of statements executed on success.
461async fn execute_changes_in_transaction(
462    pool: &sqlx::PgPool,
463    changes: &[pgroles_core::diff::Change],
464    sql_context: &pgroles_core::sql::SqlContext,
465) -> Result<usize, ReconcileError> {
466    let mut transaction = pool.begin().await?;
467    let mut statements_executed = 0usize;
468
469    for change in changes {
470        let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
471        for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
472            if is_sensitive {
473                tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
474            } else {
475                tracing::debug!(%sql, "executing");
476            }
477            sqlx::query(&sql).execute(transaction.as_mut()).await?;
478            statements_executed += 1;
479        }
480    }
481
482    transaction.commit().await?;
483    Ok(statements_executed)
484}
485
486// ---------------------------------------------------------------------------
487// Plan cleanup / retention
488// ---------------------------------------------------------------------------
489
490/// Clean up old plans for a policy, retaining at most `max_plans` terminal plans.
491///
492/// Terminal plans are those in Applied, Failed, Superseded, or Rejected phase.
493/// Pending and Approved plans are never cleaned up by this function.
494pub async fn cleanup_old_plans(
495    client: &Client,
496    policy: &PostgresPolicy,
497    max_plans: Option<usize>,
498) -> Result<(), ReconcileError> {
499    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
500    let policy_name = policy.name_any();
501    let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
502
503    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
504    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
505    let existing_plans = plans_api
506        .list(&ListParams::default().labels(&label_selector))
507        .await?;
508
509    // Collect terminal plans sorted by creation timestamp (oldest first).
510    let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
511        .iter()
512        .filter(|plan| {
513            plan.status
514                .as_ref()
515                .map(|s| {
516                    matches!(
517                        s.phase,
518                        PlanPhase::Applied
519                            | PlanPhase::Failed
520                            | PlanPhase::Superseded
521                            | PlanPhase::Rejected
522                    )
523                })
524                .unwrap_or(false)
525        })
526        .collect();
527
528    if terminal_plans.len() <= max_plans {
529        return Ok(());
530    }
531
532    // Sort by creation timestamp ascending (oldest first).
533    terminal_plans.sort_by(|a, b| {
534        let a_time = a.metadata.creation_timestamp.as_ref();
535        let b_time = b.metadata.creation_timestamp.as_ref();
536        a_time.cmp(&b_time)
537    });
538
539    let plans_to_delete = terminal_plans.len() - max_plans;
540    for plan in terminal_plans.into_iter().take(plans_to_delete) {
541        let plan_name = plan.name_any();
542        info!(
543            plan = %plan_name,
544            policy = %policy_name,
545            "cleaning up old plan"
546        );
547        if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
548            tracing::warn!(
549                plan = %plan_name,
550                %err,
551                "failed to delete old plan during cleanup"
552            );
553        }
554    }
555
556    Ok(())
557}
558
559// ---------------------------------------------------------------------------
560// Helpers
561// ---------------------------------------------------------------------------
562
563/// Render the full executable SQL from changes (including real passwords).
564pub(crate) fn render_full_sql(
565    changes: &[pgroles_core::diff::Change],
566    sql_context: &pgroles_core::sql::SqlContext,
567) -> String {
568    changes
569        .iter()
570        .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
571        .collect::<Vec<_>>()
572        .join("\n")
573}
574
575/// Render redacted SQL for display (passwords replaced with [REDACTED]).
576fn render_redacted_sql(
577    changes: &[pgroles_core::diff::Change],
578    sql_context: &pgroles_core::sql::SqlContext,
579) -> String {
580    changes
581        .iter()
582        .flat_map(|change| {
583            if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
584                vec![format!(
585                    "ALTER ROLE {} PASSWORD '[REDACTED]';",
586                    pgroles_core::sql::quote_ident(name)
587                )]
588            } else {
589                pgroles_core::sql::render_statements_with_context(change, sql_context)
590            }
591        })
592        .collect::<Vec<_>>()
593        .join("\n")
594}
595
596/// Compute SHA-256 hash of the SQL string as a hex digest.
597pub(crate) fn compute_sql_hash(sql: &str) -> String {
598    use std::fmt::Write as _;
599
600    let mut hasher = Sha256::new();
601    hasher.update(sql.as_bytes());
602    let digest = hasher.finalize();
603    let mut hex = String::with_capacity(digest.len() * 2);
604    for byte in digest {
605        write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
606    }
607    hex
608}
609
610/// Generate a plan name from policy name and current timestamp.
611///
612/// Format: `{policy-name}-plan-{YYYYMMDD-HHMMSS}-{millis}{random}`
613///
614/// A millisecond and random suffix is appended to avoid collisions when the
615/// operator retries within the same second.
616fn generate_plan_name(policy_name: &str) -> String {
617    let now = std::time::SystemTime::now()
618        .duration_since(std::time::UNIX_EPOCH)
619        .unwrap_or_default();
620    let timestamp = format_timestamp_compact();
621    let millis = now.subsec_millis();
622    let random_suffix: u32 = rand::random::<u32>() % 1000;
623    let suffix = format!("{millis:03}{random_suffix:03}");
624    // Kubernetes names must be <= 253 chars and DNS-compatible.
625    // Reserve 4 chars for the potential "-sql" ConfigMap suffix.
626    let max_name_len = 253 - 4; // 249
627    let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
628    let prefix = if policy_name.len() > max_prefix_len {
629        &policy_name[..max_prefix_len]
630    } else {
631        policy_name
632    };
633    format!("{prefix}-plan-{timestamp}-{suffix}")
634}
635
636/// Format the current UTC time as `YYYYMMDD-HHMMSS`.
637fn format_timestamp_compact() -> String {
638    use std::time::SystemTime;
639    let now = SystemTime::now()
640        .duration_since(SystemTime::UNIX_EPOCH)
641        .unwrap_or_default();
642    let secs = now.as_secs();
643    let (year, month, day) = crate::crd::days_to_date(secs / 86400);
644    let remaining = secs % 86400;
645    let hours = remaining / 3600;
646    let minutes = (remaining % 3600) / 60;
647    let seconds = remaining % 60;
648    format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
649}
650
651/// Sanitize a string for use as a Kubernetes label value.
652///
653/// Label values must be <= 63 chars and match `[a-z0-9A-Z._-]*`.
654fn sanitize_label_value(value: &str) -> String {
655    let sanitized: String = value
656        .chars()
657        .map(|c| {
658            if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
659                c
660            } else {
661                '_'
662            }
663        })
664        .take(63)
665        .collect();
666    sanitized
667}
668
669/// Current time as Unix epoch seconds (for dedup window checks).
670fn now_epoch_secs() -> i64 {
671    std::time::SystemTime::now()
672        .duration_since(std::time::UNIX_EPOCH)
673        .unwrap_or_default()
674        .as_secs() as i64
675}
676
677/// Parse an RFC 3339 timestamp string to Unix epoch seconds.
678/// Returns `None` if parsing fails.
679fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
680    // Use jiff (already a transitive dep via k8s-openapi) for RFC 3339 parsing.
681    rfc3339
682        .parse::<jiff::Timestamp>()
683        .ok()
684        .map(|t| t.as_second())
685}
686
687/// Build an OwnerReference pointing from a plan to its parent policy.
688fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
689    OwnerReference {
690        api_version: PostgresPolicy::api_version(&()).to_string(),
691        kind: PostgresPolicy::kind(&()).to_string(),
692        name: policy.name_any(),
693        uid: policy.metadata.uid.clone().unwrap_or_default(),
694        controller: Some(true),
695        block_owner_deletion: Some(true),
696    }
697}
698
699/// Build an OwnerReference pointing from a ConfigMap to its parent plan.
700fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
701    OwnerReference {
702        api_version: PostgresPolicyPlan::api_version(&()).to_string(),
703        kind: PostgresPolicyPlan::kind(&()).to_string(),
704        name: plan.name_any(),
705        uid: plan.metadata.uid.clone().unwrap_or_default(),
706        controller: Some(true),
707        block_owner_deletion: Some(true),
708    }
709}
710
711/// Update the phase field on a plan's status.
712///
713/// When transitioning to `Applying`, also sets `applying_since` for stuck
714/// plan detection.
715async fn update_plan_phase(
716    plans_api: &Api<PostgresPolicyPlan>,
717    plan_name: &str,
718    phase: PlanPhase,
719) -> Result<(), ReconcileError> {
720    let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
721    if phase == PlanPhase::Applying {
722        patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
723    }
724    plans_api
725        .patch_status(
726            plan_name,
727            &PatchParams::apply("pgroles-operator"),
728            &Patch::Merge(&patch_value),
729        )
730        .await?;
731    Ok(())
732}
733
734/// Set or update a condition in a conditions list.
735///
736/// Preserves `last_transition_time` when the status value is unchanged
737/// (only reason/message changed), matching Kubernetes condition conventions.
738fn set_plan_condition(
739    conditions: &mut Vec<PolicyCondition>,
740    condition_type: &str,
741    status: &str,
742    reason: &str,
743    message: &str,
744) {
745    let transition_time = if let Some(existing) = conditions
746        .iter()
747        .find(|c| c.condition_type == condition_type)
748    {
749        if existing.status == status {
750            existing.last_transition_time.clone()
751        } else {
752            Some(crate::crd::now_rfc3339())
753        }
754    } else {
755        Some(crate::crd::now_rfc3339())
756    };
757
758    let condition = PolicyCondition {
759        condition_type: condition_type.to_string(),
760        status: status.to_string(),
761        reason: Some(reason.to_string()),
762        message: Some(message.to_string()),
763        last_transition_time: transition_time,
764    };
765    if let Some(existing) = conditions
766        .iter_mut()
767        .find(|c| c.condition_type == condition_type)
768    {
769        *existing = condition;
770    } else {
771        conditions.push(condition);
772    }
773}
774
775/// Update the parent policy's `current_plan_ref` in status.
776pub async fn update_policy_plan_ref(
777    client: &Client,
778    policy: &PostgresPolicy,
779    plan_name: &str,
780) -> Result<(), ReconcileError> {
781    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
782    let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
783
784    let patch = serde_json::json!({
785        "status": {
786            "current_plan_ref": PlanReference {
787                name: plan_name.to_string(),
788            }
789        }
790    });
791
792    policy_api
793        .patch_status(
794            &policy.name_any(),
795            &PatchParams::apply("pgroles-operator"),
796            &Patch::Merge(&patch),
797        )
798        .await?;
799
800    Ok(())
801}
802
803/// Look up the current actionable plan for a policy, if any.
804///
805/// An actionable plan is one in `Pending` or `Approved` phase — i.e. a plan
806/// that the reconciler should evaluate for approval/execution.
807pub async fn get_current_actionable_plan(
808    client: &Client,
809    policy: &PostgresPolicy,
810) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
811    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
812    let policy_name = policy.name_any();
813
814    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
815    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
816    let existing_plans = plans_api
817        .list(&ListParams::default().labels(&label_selector))
818        .await?;
819
820    // Find the most recent actionable plan (Pending or Approved, by creation time).
821    let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
822        .into_iter()
823        .filter(|plan| {
824            plan.status
825                .as_ref()
826                .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
827                .unwrap_or(false)
828        })
829        .collect();
830
831    pending_plans.sort_by(|a, b| {
832        let a_time = a.metadata.creation_timestamp.as_ref();
833        let b_time = b.metadata.creation_timestamp.as_ref();
834        b_time.cmp(&a_time) // newest first
835    });
836
837    Ok(pending_plans.into_iter().next())
838}
839
840/// Look up the most recent plan for a policy in a given phase.
841pub async fn get_plan_by_phase(
842    client: &Client,
843    policy: &PostgresPolicy,
844    target_phase: PlanPhase,
845) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
846    let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
847    let policy_name = policy.name_any();
848
849    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
850    let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
851    let existing_plans = plans_api
852        .list(&ListParams::default().labels(&label_selector))
853        .await?;
854
855    let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
856        .into_iter()
857        .filter(|plan| {
858            plan.status
859                .as_ref()
860                .map(|s| s.phase == target_phase)
861                .unwrap_or(false)
862        })
863        .collect();
864
865    matching_plans.sort_by(|a, b| {
866        let a_time = a.metadata.creation_timestamp.as_ref();
867        let b_time = b.metadata.creation_timestamp.as_ref();
868        b_time.cmp(&a_time) // newest first
869    });
870
871    Ok(matching_plans.into_iter().next())
872}
873
874/// Mark a plan as Failed with a given error message.
875pub async fn mark_plan_failed(
876    client: &Client,
877    plan: &PostgresPolicyPlan,
878    error_message: &str,
879) -> Result<(), ReconcileError> {
880    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
881    let plan_name = plan.name_any();
882    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
883
884    let mut status = plan.status.clone().unwrap_or_default();
885    status.phase = PlanPhase::Failed;
886    status.last_error = Some(error_message.to_string());
887    status.failed_at = Some(crate::crd::now_rfc3339());
888
889    let patch = serde_json::json!({ "status": status });
890    plans_api
891        .patch_status(
892            &plan_name,
893            &PatchParams::apply("pgroles-operator"),
894            &Patch::Merge(&patch),
895        )
896        .await?;
897
898    info!(
899        plan = %plan_name,
900        "marked stuck Applying plan as Failed"
901    );
902
903    Ok(())
904}
905
906/// Mark a plan as Approved.
907///
908/// Callers provide `reason` and `message` to distinguish auto-approval from
909/// manual approval in the plan's conditions.
910pub async fn mark_plan_approved(
911    client: &Client,
912    plan: &PostgresPolicyPlan,
913    reason: &str,
914    message: &str,
915) -> Result<(), ReconcileError> {
916    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
917    let plan_name = plan.name_any();
918    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
919
920    let mut status = plan.status.clone().unwrap_or_default();
921    status.phase = PlanPhase::Approved;
922    set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
923
924    let patch = serde_json::json!({ "status": status });
925    plans_api
926        .patch_status(
927            &plan_name,
928            &PatchParams::apply("pgroles-operator"),
929            &Patch::Merge(&patch),
930        )
931        .await?;
932
933    Ok(())
934}
935
936/// Mark a plan as Rejected.
937pub async fn mark_plan_rejected(
938    client: &Client,
939    plan: &PostgresPolicyPlan,
940) -> Result<(), ReconcileError> {
941    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
942    let plan_name = plan.name_any();
943    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
944
945    let mut status = plan.status.clone().unwrap_or_default();
946    status.phase = PlanPhase::Rejected;
947    set_plan_condition(
948        &mut status.conditions,
949        "Approved",
950        "False",
951        "Rejected",
952        "Plan rejected via annotation",
953    );
954
955    let patch = serde_json::json!({ "status": status });
956    plans_api
957        .patch_status(
958            &plan_name,
959            &PatchParams::apply("pgroles-operator"),
960            &Patch::Merge(&patch),
961        )
962        .await?;
963
964    Ok(())
965}
966
967/// Mark a plan as Superseded (database state changed since approval).
968pub async fn mark_plan_superseded(
969    client: &Client,
970    plan: &PostgresPolicyPlan,
971) -> Result<(), ReconcileError> {
972    let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
973    let plan_name = plan.name_any();
974    let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
975
976    let mut status = plan.status.clone().unwrap_or_default();
977    status.phase = PlanPhase::Superseded;
978    set_plan_condition(
979        &mut status.conditions,
980        "Approved",
981        "False",
982        "Superseded",
983        "Database state changed since plan was approved",
984    );
985
986    let patch = serde_json::json!({ "status": status });
987    plans_api
988        .patch_status(
989            &plan_name,
990            &PatchParams::apply("pgroles-operator"),
991            &Patch::Merge(&patch),
992        )
993        .await?;
994
995    Ok(())
996}
997
998// ---------------------------------------------------------------------------
999// Tests
1000// ---------------------------------------------------------------------------
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use crate::crd::CrdReconciliationMode;
1006
1007    fn test_plan(
1008        name: &str,
1009        phase: PlanPhase,
1010        annotations: Option<BTreeMap<String, String>>,
1011    ) -> PostgresPolicyPlan {
1012        let mut plan = PostgresPolicyPlan::new(
1013            name,
1014            PostgresPolicyPlanSpec {
1015                policy_ref: PolicyPlanRef {
1016                    name: "test-policy".to_string(),
1017                },
1018                policy_generation: 1,
1019                reconciliation_mode: CrdReconciliationMode::Authoritative,
1020                owned_roles: vec!["role-a".to_string()],
1021                owned_schemas: vec!["public".to_string()],
1022                managed_database_identity: "default/db/DATABASE_URL".to_string(),
1023            },
1024        );
1025        plan.metadata.namespace = Some("default".to_string());
1026        plan.metadata.annotations = annotations;
1027        plan.status = Some(PostgresPolicyPlanStatus {
1028            phase,
1029            ..Default::default()
1030        });
1031        plan
1032    }
1033
1034    #[test]
1035    fn check_plan_approval_pending_when_no_annotations() {
1036        let plan = test_plan("plan-1", PlanPhase::Pending, None);
1037        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1038    }
1039
1040    #[test]
1041    fn check_plan_approval_approved_with_annotation() {
1042        let annotations =
1043            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1044        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1045        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1046    }
1047
1048    #[test]
1049    fn check_plan_approval_rejected_with_annotation() {
1050        let annotations =
1051            BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1052        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1053        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1054    }
1055
1056    #[test]
1057    fn check_plan_approval_rejected_wins_over_approved() {
1058        let annotations = BTreeMap::from([
1059            (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1060            (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1061        ]);
1062        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1063        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1064    }
1065
1066    #[test]
1067    fn check_plan_approval_non_true_value_is_pending() {
1068        let annotations =
1069            BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1070        let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1071        assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1072    }
1073
1074    #[test]
1075    fn compute_sql_hash_is_deterministic() {
1076        let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1077        let hash1 = compute_sql_hash(sql);
1078        let hash2 = compute_sql_hash(sql);
1079        assert_eq!(hash1, hash2);
1080        assert_eq!(hash1.len(), 64); // SHA-256 hex digest is 64 chars
1081    }
1082
1083    #[test]
1084    fn compute_sql_hash_differs_for_different_sql() {
1085        let hash1 = compute_sql_hash("CREATE ROLE a;");
1086        let hash2 = compute_sql_hash("CREATE ROLE b;");
1087        assert_ne!(hash1, hash2);
1088    }
1089
1090    #[test]
1091    fn generate_plan_name_has_expected_format() {
1092        let name = generate_plan_name("my-policy");
1093        assert!(name.starts_with("my-policy-plan-"));
1094        // Should be "my-policy-plan-YYYYMMDD-HHMMSS-MMMRRR"
1095        let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1096        // YYYYMMDD-HHMMSS-MMMRRR = 15 + 1 + 6 = 22 chars
1097        assert_eq!(suffix.len(), 22);
1098        assert_eq!(&suffix[8..9], "-");
1099        assert_eq!(&suffix[15..16], "-");
1100    }
1101
1102    #[test]
1103    fn generate_plan_name_is_unique_across_calls() {
1104        let name1 = generate_plan_name("my-policy");
1105        let name2 = generate_plan_name("my-policy");
1106        // With millisecond + random suffix, collisions are extremely unlikely
1107        // (this test may very rarely fail, but demonstrates the intent).
1108        assert_ne!(name1, name2);
1109    }
1110
1111    #[test]
1112    fn sanitize_label_value_replaces_slashes() {
1113        let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1114        assert!(!sanitized.contains('/'));
1115        assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1116    }
1117
1118    #[test]
1119    fn sanitize_label_value_truncates_to_63_chars() {
1120        let long_value = "a".repeat(100);
1121        let sanitized = sanitize_label_value(&long_value);
1122        assert!(sanitized.len() <= 63);
1123    }
1124
1125    #[test]
1126    fn render_redacted_sql_masks_passwords() {
1127        let changes = vec![
1128            pgroles_core::diff::Change::CreateRole {
1129                name: "app".to_string(),
1130                state: pgroles_core::model::RoleState {
1131                    login: true,
1132                    ..pgroles_core::model::RoleState::default()
1133                },
1134            },
1135            pgroles_core::diff::Change::SetPassword {
1136                name: "app".to_string(),
1137                password: "super_secret".to_string(),
1138            },
1139        ];
1140        let ctx = pgroles_core::sql::SqlContext::default();
1141        let redacted = render_redacted_sql(&changes, &ctx);
1142
1143        assert!(redacted.contains("[REDACTED]"));
1144        assert!(!redacted.contains("super_secret"));
1145        assert!(redacted.contains("CREATE ROLE"));
1146    }
1147
1148    #[test]
1149    fn render_full_sql_includes_passwords() {
1150        let changes = vec![pgroles_core::diff::Change::SetPassword {
1151            name: "app".to_string(),
1152            password: "super_secret".to_string(),
1153        }];
1154        let ctx = pgroles_core::sql::SqlContext::default();
1155        let full = render_full_sql(&changes, &ctx);
1156
1157        assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1158    }
1159
1160    #[test]
1161    fn now_epoch_secs_returns_plausible_value() {
1162        let now = now_epoch_secs();
1163        // Should be after 2025-01-01 and before 2100-01-01.
1164        let y2025 = 1_735_689_600_i64;
1165        let y2100 = 4_102_444_800_i64;
1166        assert!(
1167            now > y2025 && now < y2100,
1168            "epoch secs {now} should be between 2025 and 2100"
1169        );
1170    }
1171}