1use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17 ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18 PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19 PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20 PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28 Created(String),
30 Deduplicated(String),
32}
33
34impl PlanCreationResult {
35 pub fn plan_name(&self) -> &str {
37 match self {
38 PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39 }
40 }
41
42 pub fn is_created(&self) -> bool {
44 matches!(self, PlanCreationResult::Created(_))
45 }
46}
47
48const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54const DEFAULT_MAX_PLANS: usize = 10;
56
57const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69 Pending,
70 Approved,
71 Rejected,
72}
73
74pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79 let annotations = plan.metadata.annotations.as_ref();
80
81 let rejected = annotations
82 .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83 .map(|v| v == "true")
84 .unwrap_or(false);
85
86 if rejected {
87 return PlanApprovalState::Rejected;
88 }
89
90 let approved = annotations
91 .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92 .map(|v| v == "true")
93 .unwrap_or(false);
94
95 if approved {
96 return PlanApprovalState::Approved;
97 }
98
99 PlanApprovalState::Pending
100}
101
102#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120 client: &Client,
121 policy: &PostgresPolicy,
122 changes: &[pgroles_core::diff::Change],
123 sql_context: &pgroles_core::sql::SqlContext,
124 inspect_config: &pgroles_inspect::InspectConfig,
125 reconciliation_mode: CrdReconciliationMode,
126 database_identity: &str,
127 change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130 let policy_name = policy.name_any();
131 let generation = policy.metadata.generation.unwrap_or(0);
132
133 let full_sql = render_full_sql(changes, sql_context);
135
136 let sql_hash = compute_sql_hash(&full_sql);
138
139 let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
141
142 let redacted_sql = render_redacted_sql(changes, sql_context);
144
145 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
146
147 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
149 let existing_plans = plans_api
150 .list(&ListParams::default().labels(&label_selector))
151 .await?;
152
153 for plan in &existing_plans {
155 if let Some(ref status) = plan.status
156 && status.phase == PlanPhase::Pending
157 && status.sql_hash.as_deref() == Some(&sql_hash)
158 {
159 let plan_name = plan.name_any();
161 info!(
162 plan = %plan_name,
163 policy = %policy_name,
164 "existing pending plan has identical SQL hash, skipping creation"
165 );
166 return Ok(PlanCreationResult::Deduplicated(plan_name));
167 }
168 }
169
170 let now_ts = now_epoch_secs();
178 for plan in &existing_plans {
179 if let Some(ref status) = plan.status
180 && status.phase == PlanPhase::Failed
181 && status.sql_hash.as_deref() == Some(&sql_hash)
182 {
183 let failed_ts = status
184 .failed_at
185 .as_deref()
186 .and_then(parse_rfc3339_epoch_secs)
187 .unwrap_or(0);
188 if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
189 let plan_name = plan.name_any();
190 info!(
191 plan = %plan_name,
192 policy = %policy_name,
193 age_secs = now_ts - failed_ts,
194 "recently-failed plan has identical SQL hash, skipping creation"
195 );
196 return Ok(PlanCreationResult::Deduplicated(plan_name));
197 }
198 }
199 }
200
201 for plan in &existing_plans {
203 if let Some(ref status) = plan.status
204 && status.phase == PlanPhase::Pending
205 {
206 let plan_name = plan.name_any();
207 info!(
208 plan = %plan_name,
209 policy = %policy_name,
210 "marking existing pending plan as Superseded"
211 );
212 let superseded_status = PostgresPolicyPlanStatus {
213 phase: PlanPhase::Superseded,
214 ..status.clone()
215 };
216 let patch = serde_json::json!({ "status": superseded_status });
217 plans_api
218 .patch_status(
219 &plan_name,
220 &PatchParams::apply("pgroles-operator"),
221 &Patch::Merge(&patch),
222 )
223 .await?;
224 }
225 }
226
227 let plan_name = generate_plan_name(&policy_name);
229
230 let owner_ref = build_owner_reference(policy);
232
233 let plan = PostgresPolicyPlan::new(
235 &plan_name,
236 PostgresPolicyPlanSpec {
237 policy_ref: PolicyPlanRef {
238 name: policy_name.clone(),
239 },
240 policy_generation: generation,
241 reconciliation_mode,
242 owned_roles: inspect_config.managed_roles.clone(),
243 owned_schemas: inspect_config.managed_schemas.clone(),
244 managed_database_identity: database_identity.to_string(),
245 },
246 );
247 let mut plan = plan;
248 plan.metadata.namespace = Some(namespace.clone());
249 plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
250 plan.metadata.labels = Some(BTreeMap::from([
251 (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
252 (
253 LABEL_DATABASE_IDENTITY.to_string(),
254 sanitize_label_value(database_identity),
255 ),
256 ]));
257
258 let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
260 let summary_text = format!(
261 "{}R {}G {}D {}DP {}M",
262 change_summary.roles_created + change_summary.roles_altered,
263 change_summary.grants_added,
264 change_summary.default_privileges_set,
265 change_summary.roles_dropped,
266 change_summary.members_added,
267 );
268 plan.metadata.annotations = Some(BTreeMap::from([
269 ("pgroles.io/sql-preview".to_string(), sql_preview),
270 ("pgroles.io/summary".to_string(), summary_text),
271 (
272 "pgroles.io/sql-hash".to_string(),
273 sql_hash[..12].to_string(),
274 ),
275 ]));
276
277 let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
278 let plan_name = created_plan.name_any();
279
280 let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
282 (Some(redacted_sql), None)
283 } else {
284 let configmap_name = format!("{plan_name}-sql");
286 let configmap = ConfigMap {
287 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
288 name: Some(configmap_name.clone()),
289 namespace: Some(namespace.clone()),
290 owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
291 labels: Some(BTreeMap::from([(
292 LABEL_POLICY.to_string(),
293 sanitize_label_value(&policy_name),
294 )])),
295 ..Default::default()
296 },
297 data: Some(BTreeMap::from([(
298 SQL_CONFIGMAP_KEY.to_string(),
299 redacted_sql,
300 )])),
301 ..Default::default()
302 };
303
304 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
305 configmaps_api
306 .create(&PostParams::default(), &configmap)
307 .await?;
308
309 (
310 None,
311 Some(SqlRef {
312 name: configmap_name,
313 key: SQL_CONFIGMAP_KEY.to_string(),
314 }),
315 )
316 };
317
318 let plan_status = PostgresPolicyPlanStatus {
320 phase: PlanPhase::Pending,
321 conditions: vec![
322 PolicyCondition {
323 condition_type: "Computed".to_string(),
324 status: "True".to_string(),
325 reason: Some("PlanComputed".to_string()),
326 message: Some(format!(
327 "Plan computed with {} change(s)",
328 change_summary.total
329 )),
330 last_transition_time: Some(crate::crd::now_rfc3339()),
331 },
332 PolicyCondition {
333 condition_type: "Approved".to_string(),
334 status: "False".to_string(),
335 reason: Some("PendingApproval".to_string()),
336 message: Some("Plan awaiting approval".to_string()),
337 last_transition_time: Some(crate::crd::now_rfc3339()),
338 },
339 ],
340 change_summary: Some(change_summary.clone()),
341 sql_ref,
342 sql_inline,
343 computed_at: Some(crate::crd::now_rfc3339()),
344 applied_at: None,
345 last_error: None,
346 sql_hash: Some(sql_hash),
347 applying_since: None,
348 failed_at: None,
349 sql_statements: Some(sql_statement_count),
350 };
351
352 let status_patch = serde_json::json!({ "status": plan_status });
353 plans_api
354 .patch_status(
355 &plan_name,
356 &PatchParams::apply("pgroles-operator"),
357 &Patch::Merge(&status_patch),
358 )
359 .await?;
360
361 info!(
362 plan = %plan_name,
363 policy = %policy_name,
364 changes = change_summary.total,
365 "created new plan"
366 );
367
368 Ok(PlanCreationResult::Created(plan_name))
369}
370
371pub async fn execute_plan(
380 client: &Client,
381 plan: &PostgresPolicyPlan,
382 pool: &sqlx::PgPool,
383 sql_context: &pgroles_core::sql::SqlContext,
384 changes: &[pgroles_core::diff::Change],
385) -> Result<(), ReconcileError> {
386 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
387 let plan_name = plan.name_any();
388 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
389
390 update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
392
393 let result = execute_changes_in_transaction(pool, changes, sql_context).await;
397
398 match result {
399 Ok(statements_executed) => {
400 let mut applied_status = plan.status.clone().unwrap_or_default();
402 applied_status.phase = PlanPhase::Applied;
403 applied_status.applied_at = Some(crate::crd::now_rfc3339());
404 applied_status.last_error = None;
405 set_plan_condition(
406 &mut applied_status.conditions,
407 "Approved",
408 "True",
409 "Approved",
410 "Plan approved and executed",
411 );
412
413 let patch = serde_json::json!({ "status": applied_status });
414 plans_api
415 .patch_status(
416 &plan_name,
417 &PatchParams::apply("pgroles-operator"),
418 &Patch::Merge(&patch),
419 )
420 .await?;
421
422 info!(
423 plan = %plan_name,
424 statements = statements_executed,
425 "plan executed successfully"
426 );
427 Ok(())
428 }
429 Err(err) => {
430 let error_message = err.to_string();
432 let mut failed_status = plan.status.clone().unwrap_or_default();
433 failed_status.phase = PlanPhase::Failed;
434 failed_status.last_error = Some(error_message);
435 failed_status.failed_at = Some(crate::crd::now_rfc3339());
436
437 let patch = serde_json::json!({ "status": failed_status });
438 if let Err(status_err) = plans_api
439 .patch_status(
440 &plan_name,
441 &PatchParams::apply("pgroles-operator"),
442 &Patch::Merge(&patch),
443 )
444 .await
445 {
446 tracing::warn!(
447 plan = %plan_name,
448 %status_err,
449 "failed to update plan status to Failed"
450 );
451 }
452
453 Err(err)
454 }
455 }
456}
457
458async fn execute_changes_in_transaction(
462 pool: &sqlx::PgPool,
463 changes: &[pgroles_core::diff::Change],
464 sql_context: &pgroles_core::sql::SqlContext,
465) -> Result<usize, ReconcileError> {
466 let mut transaction = pool.begin().await?;
467 let mut statements_executed = 0usize;
468
469 for change in changes {
470 let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
471 for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
472 if is_sensitive {
473 tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
474 } else {
475 tracing::debug!(%sql, "executing");
476 }
477 sqlx::query(&sql).execute(transaction.as_mut()).await?;
478 statements_executed += 1;
479 }
480 }
481
482 transaction.commit().await?;
483 Ok(statements_executed)
484}
485
486pub async fn cleanup_old_plans(
495 client: &Client,
496 policy: &PostgresPolicy,
497 max_plans: Option<usize>,
498) -> Result<(), ReconcileError> {
499 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
500 let policy_name = policy.name_any();
501 let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
502
503 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
504 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
505 let existing_plans = plans_api
506 .list(&ListParams::default().labels(&label_selector))
507 .await?;
508
509 let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
511 .iter()
512 .filter(|plan| {
513 plan.status
514 .as_ref()
515 .map(|s| {
516 matches!(
517 s.phase,
518 PlanPhase::Applied
519 | PlanPhase::Failed
520 | PlanPhase::Superseded
521 | PlanPhase::Rejected
522 )
523 })
524 .unwrap_or(false)
525 })
526 .collect();
527
528 if terminal_plans.len() <= max_plans {
529 return Ok(());
530 }
531
532 terminal_plans.sort_by(|a, b| {
534 let a_time = a.metadata.creation_timestamp.as_ref();
535 let b_time = b.metadata.creation_timestamp.as_ref();
536 a_time.cmp(&b_time)
537 });
538
539 let plans_to_delete = terminal_plans.len() - max_plans;
540 for plan in terminal_plans.into_iter().take(plans_to_delete) {
541 let plan_name = plan.name_any();
542 info!(
543 plan = %plan_name,
544 policy = %policy_name,
545 "cleaning up old plan"
546 );
547 if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
548 tracing::warn!(
549 plan = %plan_name,
550 %err,
551 "failed to delete old plan during cleanup"
552 );
553 }
554 }
555
556 Ok(())
557}
558
559pub(crate) fn render_full_sql(
565 changes: &[pgroles_core::diff::Change],
566 sql_context: &pgroles_core::sql::SqlContext,
567) -> String {
568 changes
569 .iter()
570 .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
571 .collect::<Vec<_>>()
572 .join("\n")
573}
574
575fn render_redacted_sql(
577 changes: &[pgroles_core::diff::Change],
578 sql_context: &pgroles_core::sql::SqlContext,
579) -> String {
580 changes
581 .iter()
582 .flat_map(|change| {
583 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
584 vec![format!(
585 "ALTER ROLE {} PASSWORD '[REDACTED]';",
586 pgroles_core::sql::quote_ident(name)
587 )]
588 } else {
589 pgroles_core::sql::render_statements_with_context(change, sql_context)
590 }
591 })
592 .collect::<Vec<_>>()
593 .join("\n")
594}
595
596pub(crate) fn compute_sql_hash(sql: &str) -> String {
598 let mut hasher = Sha256::new();
599 hasher.update(sql.as_bytes());
600 format!("{:x}", hasher.finalize())
601}
602
603fn generate_plan_name(policy_name: &str) -> String {
610 let now = std::time::SystemTime::now()
611 .duration_since(std::time::UNIX_EPOCH)
612 .unwrap_or_default();
613 let timestamp = format_timestamp_compact();
614 let millis = now.subsec_millis();
615 let random_suffix: u32 = rand::random::<u32>() % 1000;
616 let suffix = format!("{millis:03}{random_suffix:03}");
617 let max_name_len = 253 - 4; let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
621 let prefix = if policy_name.len() > max_prefix_len {
622 &policy_name[..max_prefix_len]
623 } else {
624 policy_name
625 };
626 format!("{prefix}-plan-{timestamp}-{suffix}")
627}
628
629fn format_timestamp_compact() -> String {
631 use std::time::SystemTime;
632 let now = SystemTime::now()
633 .duration_since(SystemTime::UNIX_EPOCH)
634 .unwrap_or_default();
635 let secs = now.as_secs();
636 let (year, month, day) = crate::crd::days_to_date(secs / 86400);
637 let remaining = secs % 86400;
638 let hours = remaining / 3600;
639 let minutes = (remaining % 3600) / 60;
640 let seconds = remaining % 60;
641 format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
642}
643
644fn sanitize_label_value(value: &str) -> String {
648 let sanitized: String = value
649 .chars()
650 .map(|c| {
651 if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
652 c
653 } else {
654 '_'
655 }
656 })
657 .take(63)
658 .collect();
659 sanitized
660}
661
662fn now_epoch_secs() -> i64 {
664 std::time::SystemTime::now()
665 .duration_since(std::time::UNIX_EPOCH)
666 .unwrap_or_default()
667 .as_secs() as i64
668}
669
670fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
673 rfc3339
675 .parse::<jiff::Timestamp>()
676 .ok()
677 .map(|t| t.as_second())
678}
679
680fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
682 OwnerReference {
683 api_version: PostgresPolicy::api_version(&()).to_string(),
684 kind: PostgresPolicy::kind(&()).to_string(),
685 name: policy.name_any(),
686 uid: policy.metadata.uid.clone().unwrap_or_default(),
687 controller: Some(true),
688 block_owner_deletion: Some(true),
689 }
690}
691
692fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
694 OwnerReference {
695 api_version: PostgresPolicyPlan::api_version(&()).to_string(),
696 kind: PostgresPolicyPlan::kind(&()).to_string(),
697 name: plan.name_any(),
698 uid: plan.metadata.uid.clone().unwrap_or_default(),
699 controller: Some(true),
700 block_owner_deletion: Some(true),
701 }
702}
703
704async fn update_plan_phase(
709 plans_api: &Api<PostgresPolicyPlan>,
710 plan_name: &str,
711 phase: PlanPhase,
712) -> Result<(), ReconcileError> {
713 let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
714 if phase == PlanPhase::Applying {
715 patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
716 }
717 plans_api
718 .patch_status(
719 plan_name,
720 &PatchParams::apply("pgroles-operator"),
721 &Patch::Merge(&patch_value),
722 )
723 .await?;
724 Ok(())
725}
726
727fn set_plan_condition(
732 conditions: &mut Vec<PolicyCondition>,
733 condition_type: &str,
734 status: &str,
735 reason: &str,
736 message: &str,
737) {
738 let transition_time = if let Some(existing) = conditions
739 .iter()
740 .find(|c| c.condition_type == condition_type)
741 {
742 if existing.status == status {
743 existing.last_transition_time.clone()
744 } else {
745 Some(crate::crd::now_rfc3339())
746 }
747 } else {
748 Some(crate::crd::now_rfc3339())
749 };
750
751 let condition = PolicyCondition {
752 condition_type: condition_type.to_string(),
753 status: status.to_string(),
754 reason: Some(reason.to_string()),
755 message: Some(message.to_string()),
756 last_transition_time: transition_time,
757 };
758 if let Some(existing) = conditions
759 .iter_mut()
760 .find(|c| c.condition_type == condition_type)
761 {
762 *existing = condition;
763 } else {
764 conditions.push(condition);
765 }
766}
767
768pub async fn update_policy_plan_ref(
770 client: &Client,
771 policy: &PostgresPolicy,
772 plan_name: &str,
773) -> Result<(), ReconcileError> {
774 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
775 let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
776
777 let patch = serde_json::json!({
778 "status": {
779 "current_plan_ref": PlanReference {
780 name: plan_name.to_string(),
781 }
782 }
783 });
784
785 policy_api
786 .patch_status(
787 &policy.name_any(),
788 &PatchParams::apply("pgroles-operator"),
789 &Patch::Merge(&patch),
790 )
791 .await?;
792
793 Ok(())
794}
795
796pub async fn get_current_actionable_plan(
801 client: &Client,
802 policy: &PostgresPolicy,
803) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
804 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
805 let policy_name = policy.name_any();
806
807 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
808 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
809 let existing_plans = plans_api
810 .list(&ListParams::default().labels(&label_selector))
811 .await?;
812
813 let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
815 .into_iter()
816 .filter(|plan| {
817 plan.status
818 .as_ref()
819 .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
820 .unwrap_or(false)
821 })
822 .collect();
823
824 pending_plans.sort_by(|a, b| {
825 let a_time = a.metadata.creation_timestamp.as_ref();
826 let b_time = b.metadata.creation_timestamp.as_ref();
827 b_time.cmp(&a_time) });
829
830 Ok(pending_plans.into_iter().next())
831}
832
833pub async fn get_plan_by_phase(
835 client: &Client,
836 policy: &PostgresPolicy,
837 target_phase: PlanPhase,
838) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
839 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
840 let policy_name = policy.name_any();
841
842 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
843 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
844 let existing_plans = plans_api
845 .list(&ListParams::default().labels(&label_selector))
846 .await?;
847
848 let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
849 .into_iter()
850 .filter(|plan| {
851 plan.status
852 .as_ref()
853 .map(|s| s.phase == target_phase)
854 .unwrap_or(false)
855 })
856 .collect();
857
858 matching_plans.sort_by(|a, b| {
859 let a_time = a.metadata.creation_timestamp.as_ref();
860 let b_time = b.metadata.creation_timestamp.as_ref();
861 b_time.cmp(&a_time) });
863
864 Ok(matching_plans.into_iter().next())
865}
866
867pub async fn mark_plan_failed(
869 client: &Client,
870 plan: &PostgresPolicyPlan,
871 error_message: &str,
872) -> Result<(), ReconcileError> {
873 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
874 let plan_name = plan.name_any();
875 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
876
877 let mut status = plan.status.clone().unwrap_or_default();
878 status.phase = PlanPhase::Failed;
879 status.last_error = Some(error_message.to_string());
880 status.failed_at = Some(crate::crd::now_rfc3339());
881
882 let patch = serde_json::json!({ "status": status });
883 plans_api
884 .patch_status(
885 &plan_name,
886 &PatchParams::apply("pgroles-operator"),
887 &Patch::Merge(&patch),
888 )
889 .await?;
890
891 info!(
892 plan = %plan_name,
893 "marked stuck Applying plan as Failed"
894 );
895
896 Ok(())
897}
898
899pub async fn mark_plan_approved(
904 client: &Client,
905 plan: &PostgresPolicyPlan,
906 reason: &str,
907 message: &str,
908) -> Result<(), ReconcileError> {
909 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
910 let plan_name = plan.name_any();
911 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
912
913 let mut status = plan.status.clone().unwrap_or_default();
914 status.phase = PlanPhase::Approved;
915 set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
916
917 let patch = serde_json::json!({ "status": status });
918 plans_api
919 .patch_status(
920 &plan_name,
921 &PatchParams::apply("pgroles-operator"),
922 &Patch::Merge(&patch),
923 )
924 .await?;
925
926 Ok(())
927}
928
929pub async fn mark_plan_rejected(
931 client: &Client,
932 plan: &PostgresPolicyPlan,
933) -> Result<(), ReconcileError> {
934 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
935 let plan_name = plan.name_any();
936 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
937
938 let mut status = plan.status.clone().unwrap_or_default();
939 status.phase = PlanPhase::Rejected;
940 set_plan_condition(
941 &mut status.conditions,
942 "Approved",
943 "False",
944 "Rejected",
945 "Plan rejected via annotation",
946 );
947
948 let patch = serde_json::json!({ "status": status });
949 plans_api
950 .patch_status(
951 &plan_name,
952 &PatchParams::apply("pgroles-operator"),
953 &Patch::Merge(&patch),
954 )
955 .await?;
956
957 Ok(())
958}
959
960pub async fn mark_plan_superseded(
962 client: &Client,
963 plan: &PostgresPolicyPlan,
964) -> Result<(), ReconcileError> {
965 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
966 let plan_name = plan.name_any();
967 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
968
969 let mut status = plan.status.clone().unwrap_or_default();
970 status.phase = PlanPhase::Superseded;
971 set_plan_condition(
972 &mut status.conditions,
973 "Approved",
974 "False",
975 "Superseded",
976 "Database state changed since plan was approved",
977 );
978
979 let patch = serde_json::json!({ "status": status });
980 plans_api
981 .patch_status(
982 &plan_name,
983 &PatchParams::apply("pgroles-operator"),
984 &Patch::Merge(&patch),
985 )
986 .await?;
987
988 Ok(())
989}
990
991#[cfg(test)]
996mod tests {
997 use super::*;
998 use crate::crd::CrdReconciliationMode;
999
1000 fn test_plan(
1001 name: &str,
1002 phase: PlanPhase,
1003 annotations: Option<BTreeMap<String, String>>,
1004 ) -> PostgresPolicyPlan {
1005 let mut plan = PostgresPolicyPlan::new(
1006 name,
1007 PostgresPolicyPlanSpec {
1008 policy_ref: PolicyPlanRef {
1009 name: "test-policy".to_string(),
1010 },
1011 policy_generation: 1,
1012 reconciliation_mode: CrdReconciliationMode::Authoritative,
1013 owned_roles: vec!["role-a".to_string()],
1014 owned_schemas: vec!["public".to_string()],
1015 managed_database_identity: "default/db/DATABASE_URL".to_string(),
1016 },
1017 );
1018 plan.metadata.namespace = Some("default".to_string());
1019 plan.metadata.annotations = annotations;
1020 plan.status = Some(PostgresPolicyPlanStatus {
1021 phase,
1022 ..Default::default()
1023 });
1024 plan
1025 }
1026
1027 #[test]
1028 fn check_plan_approval_pending_when_no_annotations() {
1029 let plan = test_plan("plan-1", PlanPhase::Pending, None);
1030 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1031 }
1032
1033 #[test]
1034 fn check_plan_approval_approved_with_annotation() {
1035 let annotations =
1036 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1037 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1038 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1039 }
1040
1041 #[test]
1042 fn check_plan_approval_rejected_with_annotation() {
1043 let annotations =
1044 BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1045 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1046 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1047 }
1048
1049 #[test]
1050 fn check_plan_approval_rejected_wins_over_approved() {
1051 let annotations = BTreeMap::from([
1052 (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1053 (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1054 ]);
1055 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1056 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1057 }
1058
1059 #[test]
1060 fn check_plan_approval_non_true_value_is_pending() {
1061 let annotations =
1062 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1063 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1064 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1065 }
1066
1067 #[test]
1068 fn compute_sql_hash_is_deterministic() {
1069 let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1070 let hash1 = compute_sql_hash(sql);
1071 let hash2 = compute_sql_hash(sql);
1072 assert_eq!(hash1, hash2);
1073 assert_eq!(hash1.len(), 64); }
1075
1076 #[test]
1077 fn compute_sql_hash_differs_for_different_sql() {
1078 let hash1 = compute_sql_hash("CREATE ROLE a;");
1079 let hash2 = compute_sql_hash("CREATE ROLE b;");
1080 assert_ne!(hash1, hash2);
1081 }
1082
1083 #[test]
1084 fn generate_plan_name_has_expected_format() {
1085 let name = generate_plan_name("my-policy");
1086 assert!(name.starts_with("my-policy-plan-"));
1087 let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1089 assert_eq!(suffix.len(), 22);
1091 assert_eq!(&suffix[8..9], "-");
1092 assert_eq!(&suffix[15..16], "-");
1093 }
1094
1095 #[test]
1096 fn generate_plan_name_is_unique_across_calls() {
1097 let name1 = generate_plan_name("my-policy");
1098 let name2 = generate_plan_name("my-policy");
1099 assert_ne!(name1, name2);
1102 }
1103
1104 #[test]
1105 fn sanitize_label_value_replaces_slashes() {
1106 let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1107 assert!(!sanitized.contains('/'));
1108 assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1109 }
1110
1111 #[test]
1112 fn sanitize_label_value_truncates_to_63_chars() {
1113 let long_value = "a".repeat(100);
1114 let sanitized = sanitize_label_value(&long_value);
1115 assert!(sanitized.len() <= 63);
1116 }
1117
1118 #[test]
1119 fn render_redacted_sql_masks_passwords() {
1120 let changes = vec![
1121 pgroles_core::diff::Change::CreateRole {
1122 name: "app".to_string(),
1123 state: pgroles_core::model::RoleState {
1124 login: true,
1125 ..pgroles_core::model::RoleState::default()
1126 },
1127 },
1128 pgroles_core::diff::Change::SetPassword {
1129 name: "app".to_string(),
1130 password: "super_secret".to_string(),
1131 },
1132 ];
1133 let ctx = pgroles_core::sql::SqlContext::default();
1134 let redacted = render_redacted_sql(&changes, &ctx);
1135
1136 assert!(redacted.contains("[REDACTED]"));
1137 assert!(!redacted.contains("super_secret"));
1138 assert!(redacted.contains("CREATE ROLE"));
1139 }
1140
1141 #[test]
1142 fn render_full_sql_includes_passwords() {
1143 let changes = vec![pgroles_core::diff::Change::SetPassword {
1144 name: "app".to_string(),
1145 password: "super_secret".to_string(),
1146 }];
1147 let ctx = pgroles_core::sql::SqlContext::default();
1148 let full = render_full_sql(&changes, &ctx);
1149
1150 assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1151 }
1152
1153 #[test]
1154 fn now_epoch_secs_returns_plausible_value() {
1155 let now = now_epoch_secs();
1156 let y2025 = 1_735_689_600_i64;
1158 let y2100 = 4_102_444_800_i64;
1159 assert!(
1160 now > y2025 && now < y2100,
1161 "epoch secs {now} should be between 2025 and 2100"
1162 );
1163 }
1164}