1use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17 ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18 PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19 PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20 PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28 Created(String),
30 Deduplicated(String),
32}
33
34impl PlanCreationResult {
35 pub fn plan_name(&self) -> &str {
37 match self {
38 PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39 }
40 }
41
42 pub fn is_created(&self) -> bool {
44 matches!(self, PlanCreationResult::Created(_))
45 }
46}
47
48const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54const DEFAULT_MAX_PLANS: usize = 10;
56
57const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69 Pending,
70 Approved,
71 Rejected,
72}
73
74pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79 let annotations = plan.metadata.annotations.as_ref();
80
81 let rejected = annotations
82 .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83 .map(|v| v == "true")
84 .unwrap_or(false);
85
86 if rejected {
87 return PlanApprovalState::Rejected;
88 }
89
90 let approved = annotations
91 .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92 .map(|v| v == "true")
93 .unwrap_or(false);
94
95 if approved {
96 return PlanApprovalState::Approved;
97 }
98
99 PlanApprovalState::Pending
100}
101
102#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120 client: &Client,
121 policy: &PostgresPolicy,
122 changes: &[pgroles_core::diff::Change],
123 sql_context: &pgroles_core::sql::SqlContext,
124 inspect_config: &pgroles_inspect::InspectConfig,
125 reconciliation_mode: CrdReconciliationMode,
126 database_identity: &str,
127 change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130 let policy_name = policy.name_any();
131 let generation = policy.metadata.generation.unwrap_or(0);
132
133 let full_sql = render_full_sql(changes, sql_context);
135
136 let sql_hash = compute_sql_hash(&full_sql);
138
139 let redacted_sql = render_redacted_sql(changes, sql_context);
141
142 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
143
144 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
146 let existing_plans = plans_api
147 .list(&ListParams::default().labels(&label_selector))
148 .await?;
149
150 for plan in &existing_plans {
152 if let Some(ref status) = plan.status
153 && status.phase == PlanPhase::Pending
154 && status.sql_hash.as_deref() == Some(&sql_hash)
155 {
156 let plan_name = plan.name_any();
158 info!(
159 plan = %plan_name,
160 policy = %policy_name,
161 "existing pending plan has identical SQL hash, skipping creation"
162 );
163 return Ok(PlanCreationResult::Deduplicated(plan_name));
164 }
165 }
166
167 let now_ts = now_epoch_secs();
175 for plan in &existing_plans {
176 if let Some(ref status) = plan.status
177 && status.phase == PlanPhase::Failed
178 && status.sql_hash.as_deref() == Some(&sql_hash)
179 {
180 let failed_ts = status
181 .failed_at
182 .as_deref()
183 .and_then(parse_rfc3339_epoch_secs)
184 .unwrap_or(0);
185 if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
186 let plan_name = plan.name_any();
187 info!(
188 plan = %plan_name,
189 policy = %policy_name,
190 age_secs = now_ts - failed_ts,
191 "recently-failed plan has identical SQL hash, skipping creation"
192 );
193 return Ok(PlanCreationResult::Deduplicated(plan_name));
194 }
195 }
196 }
197
198 for plan in &existing_plans {
200 if let Some(ref status) = plan.status
201 && status.phase == PlanPhase::Pending
202 {
203 let plan_name = plan.name_any();
204 info!(
205 plan = %plan_name,
206 policy = %policy_name,
207 "marking existing pending plan as Superseded"
208 );
209 let superseded_status = PostgresPolicyPlanStatus {
210 phase: PlanPhase::Superseded,
211 ..status.clone()
212 };
213 let patch = serde_json::json!({ "status": superseded_status });
214 plans_api
215 .patch_status(
216 &plan_name,
217 &PatchParams::apply("pgroles-operator"),
218 &Patch::Merge(&patch),
219 )
220 .await?;
221 }
222 }
223
224 let plan_name = generate_plan_name(&policy_name);
226
227 let owner_ref = build_owner_reference(policy);
229
230 let plan = PostgresPolicyPlan::new(
232 &plan_name,
233 PostgresPolicyPlanSpec {
234 policy_ref: PolicyPlanRef {
235 name: policy_name.clone(),
236 },
237 policy_generation: generation,
238 reconciliation_mode,
239 owned_roles: inspect_config.managed_roles.clone(),
240 owned_schemas: inspect_config.managed_schemas.clone(),
241 managed_database_identity: database_identity.to_string(),
242 },
243 );
244 let mut plan = plan;
245 plan.metadata.namespace = Some(namespace.clone());
246 plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
247 plan.metadata.labels = Some(BTreeMap::from([
248 (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
249 (
250 LABEL_DATABASE_IDENTITY.to_string(),
251 sanitize_label_value(database_identity),
252 ),
253 ]));
254
255 let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
256 let plan_name = created_plan.name_any();
257
258 let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
260 (Some(redacted_sql), None)
261 } else {
262 let configmap_name = format!("{plan_name}-sql");
264 let configmap = ConfigMap {
265 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
266 name: Some(configmap_name.clone()),
267 namespace: Some(namespace.clone()),
268 owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
269 labels: Some(BTreeMap::from([(
270 LABEL_POLICY.to_string(),
271 sanitize_label_value(&policy_name),
272 )])),
273 ..Default::default()
274 },
275 data: Some(BTreeMap::from([(
276 SQL_CONFIGMAP_KEY.to_string(),
277 redacted_sql,
278 )])),
279 ..Default::default()
280 };
281
282 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
283 configmaps_api
284 .create(&PostParams::default(), &configmap)
285 .await?;
286
287 (
288 None,
289 Some(SqlRef {
290 name: configmap_name,
291 key: SQL_CONFIGMAP_KEY.to_string(),
292 }),
293 )
294 };
295
296 let plan_status = PostgresPolicyPlanStatus {
298 phase: PlanPhase::Pending,
299 conditions: vec![
300 PolicyCondition {
301 condition_type: "Computed".to_string(),
302 status: "True".to_string(),
303 reason: Some("PlanComputed".to_string()),
304 message: Some(format!(
305 "Plan computed with {} change(s)",
306 change_summary.total
307 )),
308 last_transition_time: Some(crate::crd::now_rfc3339()),
309 },
310 PolicyCondition {
311 condition_type: "Approved".to_string(),
312 status: "False".to_string(),
313 reason: Some("PendingApproval".to_string()),
314 message: Some("Plan awaiting approval".to_string()),
315 last_transition_time: Some(crate::crd::now_rfc3339()),
316 },
317 ],
318 change_summary: Some(change_summary.clone()),
319 sql_ref,
320 sql_inline,
321 computed_at: Some(crate::crd::now_rfc3339()),
322 applied_at: None,
323 last_error: None,
324 sql_hash: Some(sql_hash),
325 applying_since: None,
326 failed_at: None,
327 };
328
329 let status_patch = serde_json::json!({ "status": plan_status });
330 plans_api
331 .patch_status(
332 &plan_name,
333 &PatchParams::apply("pgroles-operator"),
334 &Patch::Merge(&status_patch),
335 )
336 .await?;
337
338 info!(
339 plan = %plan_name,
340 policy = %policy_name,
341 changes = change_summary.total,
342 "created new plan"
343 );
344
345 Ok(PlanCreationResult::Created(plan_name))
346}
347
348pub async fn execute_plan(
357 client: &Client,
358 plan: &PostgresPolicyPlan,
359 pool: &sqlx::PgPool,
360 sql_context: &pgroles_core::sql::SqlContext,
361 changes: &[pgroles_core::diff::Change],
362) -> Result<(), ReconcileError> {
363 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
364 let plan_name = plan.name_any();
365 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
366
367 update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
369
370 let result = execute_changes_in_transaction(pool, changes, sql_context).await;
374
375 match result {
376 Ok(statements_executed) => {
377 let mut applied_status = plan.status.clone().unwrap_or_default();
379 applied_status.phase = PlanPhase::Applied;
380 applied_status.applied_at = Some(crate::crd::now_rfc3339());
381 applied_status.last_error = None;
382 set_plan_condition(
383 &mut applied_status.conditions,
384 "Approved",
385 "True",
386 "Approved",
387 "Plan approved and executed",
388 );
389
390 let patch = serde_json::json!({ "status": applied_status });
391 plans_api
392 .patch_status(
393 &plan_name,
394 &PatchParams::apply("pgroles-operator"),
395 &Patch::Merge(&patch),
396 )
397 .await?;
398
399 info!(
400 plan = %plan_name,
401 statements = statements_executed,
402 "plan executed successfully"
403 );
404 Ok(())
405 }
406 Err(err) => {
407 let error_message = err.to_string();
409 let mut failed_status = plan.status.clone().unwrap_or_default();
410 failed_status.phase = PlanPhase::Failed;
411 failed_status.last_error = Some(error_message);
412 failed_status.failed_at = Some(crate::crd::now_rfc3339());
413
414 let patch = serde_json::json!({ "status": failed_status });
415 if let Err(status_err) = plans_api
416 .patch_status(
417 &plan_name,
418 &PatchParams::apply("pgroles-operator"),
419 &Patch::Merge(&patch),
420 )
421 .await
422 {
423 tracing::warn!(
424 plan = %plan_name,
425 %status_err,
426 "failed to update plan status to Failed"
427 );
428 }
429
430 Err(err)
431 }
432 }
433}
434
435async fn execute_changes_in_transaction(
439 pool: &sqlx::PgPool,
440 changes: &[pgroles_core::diff::Change],
441 sql_context: &pgroles_core::sql::SqlContext,
442) -> Result<usize, ReconcileError> {
443 let mut transaction = pool.begin().await?;
444 let mut statements_executed = 0usize;
445
446 for change in changes {
447 let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
448 for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
449 if is_sensitive {
450 tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
451 } else {
452 tracing::debug!(%sql, "executing");
453 }
454 sqlx::query(&sql).execute(transaction.as_mut()).await?;
455 statements_executed += 1;
456 }
457 }
458
459 transaction.commit().await?;
460 Ok(statements_executed)
461}
462
463pub async fn cleanup_old_plans(
472 client: &Client,
473 policy: &PostgresPolicy,
474 max_plans: Option<usize>,
475) -> Result<(), ReconcileError> {
476 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
477 let policy_name = policy.name_any();
478 let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
479
480 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
481 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
482 let existing_plans = plans_api
483 .list(&ListParams::default().labels(&label_selector))
484 .await?;
485
486 let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
488 .iter()
489 .filter(|plan| {
490 plan.status
491 .as_ref()
492 .map(|s| {
493 matches!(
494 s.phase,
495 PlanPhase::Applied
496 | PlanPhase::Failed
497 | PlanPhase::Superseded
498 | PlanPhase::Rejected
499 )
500 })
501 .unwrap_or(false)
502 })
503 .collect();
504
505 if terminal_plans.len() <= max_plans {
506 return Ok(());
507 }
508
509 terminal_plans.sort_by(|a, b| {
511 let a_time = a.metadata.creation_timestamp.as_ref();
512 let b_time = b.metadata.creation_timestamp.as_ref();
513 a_time.cmp(&b_time)
514 });
515
516 let plans_to_delete = terminal_plans.len() - max_plans;
517 for plan in terminal_plans.into_iter().take(plans_to_delete) {
518 let plan_name = plan.name_any();
519 info!(
520 plan = %plan_name,
521 policy = %policy_name,
522 "cleaning up old plan"
523 );
524 if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
525 tracing::warn!(
526 plan = %plan_name,
527 %err,
528 "failed to delete old plan during cleanup"
529 );
530 }
531 }
532
533 Ok(())
534}
535
536pub(crate) fn render_full_sql(
542 changes: &[pgroles_core::diff::Change],
543 sql_context: &pgroles_core::sql::SqlContext,
544) -> String {
545 changes
546 .iter()
547 .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
548 .collect::<Vec<_>>()
549 .join("\n")
550}
551
552fn render_redacted_sql(
554 changes: &[pgroles_core::diff::Change],
555 sql_context: &pgroles_core::sql::SqlContext,
556) -> String {
557 changes
558 .iter()
559 .flat_map(|change| {
560 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
561 vec![format!(
562 "ALTER ROLE {} PASSWORD '[REDACTED]';",
563 pgroles_core::sql::quote_ident(name)
564 )]
565 } else {
566 pgroles_core::sql::render_statements_with_context(change, sql_context)
567 }
568 })
569 .collect::<Vec<_>>()
570 .join("\n")
571}
572
573pub(crate) fn compute_sql_hash(sql: &str) -> String {
575 let mut hasher = Sha256::new();
576 hasher.update(sql.as_bytes());
577 format!("{:x}", hasher.finalize())
578}
579
580fn generate_plan_name(policy_name: &str) -> String {
587 let now = std::time::SystemTime::now()
588 .duration_since(std::time::UNIX_EPOCH)
589 .unwrap_or_default();
590 let timestamp = format_timestamp_compact();
591 let millis = now.subsec_millis();
592 let random_suffix: u32 = rand::random::<u32>() % 1000;
593 let suffix = format!("{millis:03}{random_suffix:03}");
594 let max_name_len = 253 - 4; let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
598 let prefix = if policy_name.len() > max_prefix_len {
599 &policy_name[..max_prefix_len]
600 } else {
601 policy_name
602 };
603 format!("{prefix}-plan-{timestamp}-{suffix}")
604}
605
606fn format_timestamp_compact() -> String {
608 use std::time::SystemTime;
609 let now = SystemTime::now()
610 .duration_since(SystemTime::UNIX_EPOCH)
611 .unwrap_or_default();
612 let secs = now.as_secs();
613 let (year, month, day) = crate::crd::days_to_date(secs / 86400);
614 let remaining = secs % 86400;
615 let hours = remaining / 3600;
616 let minutes = (remaining % 3600) / 60;
617 let seconds = remaining % 60;
618 format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
619}
620
621fn sanitize_label_value(value: &str) -> String {
625 let sanitized: String = value
626 .chars()
627 .map(|c| {
628 if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
629 c
630 } else {
631 '_'
632 }
633 })
634 .take(63)
635 .collect();
636 sanitized
637}
638
639fn now_epoch_secs() -> i64 {
641 std::time::SystemTime::now()
642 .duration_since(std::time::UNIX_EPOCH)
643 .unwrap_or_default()
644 .as_secs() as i64
645}
646
647fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
650 rfc3339
652 .parse::<jiff::Timestamp>()
653 .ok()
654 .map(|t| t.as_second())
655}
656
657fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
659 OwnerReference {
660 api_version: PostgresPolicy::api_version(&()).to_string(),
661 kind: PostgresPolicy::kind(&()).to_string(),
662 name: policy.name_any(),
663 uid: policy.metadata.uid.clone().unwrap_or_default(),
664 controller: Some(true),
665 block_owner_deletion: Some(true),
666 }
667}
668
669fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
671 OwnerReference {
672 api_version: PostgresPolicyPlan::api_version(&()).to_string(),
673 kind: PostgresPolicyPlan::kind(&()).to_string(),
674 name: plan.name_any(),
675 uid: plan.metadata.uid.clone().unwrap_or_default(),
676 controller: Some(true),
677 block_owner_deletion: Some(true),
678 }
679}
680
681async fn update_plan_phase(
686 plans_api: &Api<PostgresPolicyPlan>,
687 plan_name: &str,
688 phase: PlanPhase,
689) -> Result<(), ReconcileError> {
690 let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
691 if phase == PlanPhase::Applying {
692 patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
693 }
694 plans_api
695 .patch_status(
696 plan_name,
697 &PatchParams::apply("pgroles-operator"),
698 &Patch::Merge(&patch_value),
699 )
700 .await?;
701 Ok(())
702}
703
704fn set_plan_condition(
709 conditions: &mut Vec<PolicyCondition>,
710 condition_type: &str,
711 status: &str,
712 reason: &str,
713 message: &str,
714) {
715 let transition_time = if let Some(existing) = conditions
716 .iter()
717 .find(|c| c.condition_type == condition_type)
718 {
719 if existing.status == status {
720 existing.last_transition_time.clone()
721 } else {
722 Some(crate::crd::now_rfc3339())
723 }
724 } else {
725 Some(crate::crd::now_rfc3339())
726 };
727
728 let condition = PolicyCondition {
729 condition_type: condition_type.to_string(),
730 status: status.to_string(),
731 reason: Some(reason.to_string()),
732 message: Some(message.to_string()),
733 last_transition_time: transition_time,
734 };
735 if let Some(existing) = conditions
736 .iter_mut()
737 .find(|c| c.condition_type == condition_type)
738 {
739 *existing = condition;
740 } else {
741 conditions.push(condition);
742 }
743}
744
745pub async fn update_policy_plan_ref(
747 client: &Client,
748 policy: &PostgresPolicy,
749 plan_name: &str,
750) -> Result<(), ReconcileError> {
751 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
752 let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
753
754 let patch = serde_json::json!({
755 "status": {
756 "current_plan_ref": PlanReference {
757 name: plan_name.to_string(),
758 }
759 }
760 });
761
762 policy_api
763 .patch_status(
764 &policy.name_any(),
765 &PatchParams::apply("pgroles-operator"),
766 &Patch::Merge(&patch),
767 )
768 .await?;
769
770 Ok(())
771}
772
773pub async fn get_current_actionable_plan(
778 client: &Client,
779 policy: &PostgresPolicy,
780) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
781 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
782 let policy_name = policy.name_any();
783
784 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
785 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
786 let existing_plans = plans_api
787 .list(&ListParams::default().labels(&label_selector))
788 .await?;
789
790 let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
792 .into_iter()
793 .filter(|plan| {
794 plan.status
795 .as_ref()
796 .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
797 .unwrap_or(false)
798 })
799 .collect();
800
801 pending_plans.sort_by(|a, b| {
802 let a_time = a.metadata.creation_timestamp.as_ref();
803 let b_time = b.metadata.creation_timestamp.as_ref();
804 b_time.cmp(&a_time) });
806
807 Ok(pending_plans.into_iter().next())
808}
809
810pub async fn get_plan_by_phase(
812 client: &Client,
813 policy: &PostgresPolicy,
814 target_phase: PlanPhase,
815) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
816 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
817 let policy_name = policy.name_any();
818
819 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
820 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
821 let existing_plans = plans_api
822 .list(&ListParams::default().labels(&label_selector))
823 .await?;
824
825 let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
826 .into_iter()
827 .filter(|plan| {
828 plan.status
829 .as_ref()
830 .map(|s| s.phase == target_phase)
831 .unwrap_or(false)
832 })
833 .collect();
834
835 matching_plans.sort_by(|a, b| {
836 let a_time = a.metadata.creation_timestamp.as_ref();
837 let b_time = b.metadata.creation_timestamp.as_ref();
838 b_time.cmp(&a_time) });
840
841 Ok(matching_plans.into_iter().next())
842}
843
844pub async fn mark_plan_failed(
846 client: &Client,
847 plan: &PostgresPolicyPlan,
848 error_message: &str,
849) -> Result<(), ReconcileError> {
850 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
851 let plan_name = plan.name_any();
852 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
853
854 let mut status = plan.status.clone().unwrap_or_default();
855 status.phase = PlanPhase::Failed;
856 status.last_error = Some(error_message.to_string());
857 status.failed_at = Some(crate::crd::now_rfc3339());
858
859 let patch = serde_json::json!({ "status": status });
860 plans_api
861 .patch_status(
862 &plan_name,
863 &PatchParams::apply("pgroles-operator"),
864 &Patch::Merge(&patch),
865 )
866 .await?;
867
868 info!(
869 plan = %plan_name,
870 "marked stuck Applying plan as Failed"
871 );
872
873 Ok(())
874}
875
876pub async fn mark_plan_approved(
881 client: &Client,
882 plan: &PostgresPolicyPlan,
883 reason: &str,
884 message: &str,
885) -> Result<(), ReconcileError> {
886 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
887 let plan_name = plan.name_any();
888 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
889
890 let mut status = plan.status.clone().unwrap_or_default();
891 status.phase = PlanPhase::Approved;
892 set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
893
894 let patch = serde_json::json!({ "status": status });
895 plans_api
896 .patch_status(
897 &plan_name,
898 &PatchParams::apply("pgroles-operator"),
899 &Patch::Merge(&patch),
900 )
901 .await?;
902
903 Ok(())
904}
905
906pub async fn mark_plan_rejected(
908 client: &Client,
909 plan: &PostgresPolicyPlan,
910) -> Result<(), ReconcileError> {
911 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
912 let plan_name = plan.name_any();
913 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
914
915 let mut status = plan.status.clone().unwrap_or_default();
916 status.phase = PlanPhase::Rejected;
917 set_plan_condition(
918 &mut status.conditions,
919 "Approved",
920 "False",
921 "Rejected",
922 "Plan rejected via annotation",
923 );
924
925 let patch = serde_json::json!({ "status": status });
926 plans_api
927 .patch_status(
928 &plan_name,
929 &PatchParams::apply("pgroles-operator"),
930 &Patch::Merge(&patch),
931 )
932 .await?;
933
934 Ok(())
935}
936
937pub async fn mark_plan_superseded(
939 client: &Client,
940 plan: &PostgresPolicyPlan,
941) -> Result<(), ReconcileError> {
942 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
943 let plan_name = plan.name_any();
944 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
945
946 let mut status = plan.status.clone().unwrap_or_default();
947 status.phase = PlanPhase::Superseded;
948 set_plan_condition(
949 &mut status.conditions,
950 "Approved",
951 "False",
952 "Superseded",
953 "Database state changed since plan was approved",
954 );
955
956 let patch = serde_json::json!({ "status": status });
957 plans_api
958 .patch_status(
959 &plan_name,
960 &PatchParams::apply("pgroles-operator"),
961 &Patch::Merge(&patch),
962 )
963 .await?;
964
965 Ok(())
966}
967
968#[cfg(test)]
973mod tests {
974 use super::*;
975 use crate::crd::CrdReconciliationMode;
976
977 fn test_plan(
978 name: &str,
979 phase: PlanPhase,
980 annotations: Option<BTreeMap<String, String>>,
981 ) -> PostgresPolicyPlan {
982 let mut plan = PostgresPolicyPlan::new(
983 name,
984 PostgresPolicyPlanSpec {
985 policy_ref: PolicyPlanRef {
986 name: "test-policy".to_string(),
987 },
988 policy_generation: 1,
989 reconciliation_mode: CrdReconciliationMode::Authoritative,
990 owned_roles: vec!["role-a".to_string()],
991 owned_schemas: vec!["public".to_string()],
992 managed_database_identity: "default/db/DATABASE_URL".to_string(),
993 },
994 );
995 plan.metadata.namespace = Some("default".to_string());
996 plan.metadata.annotations = annotations;
997 plan.status = Some(PostgresPolicyPlanStatus {
998 phase,
999 ..Default::default()
1000 });
1001 plan
1002 }
1003
1004 #[test]
1005 fn check_plan_approval_pending_when_no_annotations() {
1006 let plan = test_plan("plan-1", PlanPhase::Pending, None);
1007 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1008 }
1009
1010 #[test]
1011 fn check_plan_approval_approved_with_annotation() {
1012 let annotations =
1013 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1014 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1015 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1016 }
1017
1018 #[test]
1019 fn check_plan_approval_rejected_with_annotation() {
1020 let annotations =
1021 BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1022 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1023 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1024 }
1025
1026 #[test]
1027 fn check_plan_approval_rejected_wins_over_approved() {
1028 let annotations = BTreeMap::from([
1029 (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1030 (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1031 ]);
1032 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1033 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1034 }
1035
1036 #[test]
1037 fn check_plan_approval_non_true_value_is_pending() {
1038 let annotations =
1039 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1040 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1041 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1042 }
1043
1044 #[test]
1045 fn compute_sql_hash_is_deterministic() {
1046 let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1047 let hash1 = compute_sql_hash(sql);
1048 let hash2 = compute_sql_hash(sql);
1049 assert_eq!(hash1, hash2);
1050 assert_eq!(hash1.len(), 64); }
1052
1053 #[test]
1054 fn compute_sql_hash_differs_for_different_sql() {
1055 let hash1 = compute_sql_hash("CREATE ROLE a;");
1056 let hash2 = compute_sql_hash("CREATE ROLE b;");
1057 assert_ne!(hash1, hash2);
1058 }
1059
1060 #[test]
1061 fn generate_plan_name_has_expected_format() {
1062 let name = generate_plan_name("my-policy");
1063 assert!(name.starts_with("my-policy-plan-"));
1064 let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1066 assert_eq!(suffix.len(), 22);
1068 assert_eq!(&suffix[8..9], "-");
1069 assert_eq!(&suffix[15..16], "-");
1070 }
1071
1072 #[test]
1073 fn generate_plan_name_is_unique_across_calls() {
1074 let name1 = generate_plan_name("my-policy");
1075 let name2 = generate_plan_name("my-policy");
1076 assert_ne!(name1, name2);
1079 }
1080
1081 #[test]
1082 fn sanitize_label_value_replaces_slashes() {
1083 let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1084 assert!(!sanitized.contains('/'));
1085 assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1086 }
1087
1088 #[test]
1089 fn sanitize_label_value_truncates_to_63_chars() {
1090 let long_value = "a".repeat(100);
1091 let sanitized = sanitize_label_value(&long_value);
1092 assert!(sanitized.len() <= 63);
1093 }
1094
1095 #[test]
1096 fn render_redacted_sql_masks_passwords() {
1097 let changes = vec![
1098 pgroles_core::diff::Change::CreateRole {
1099 name: "app".to_string(),
1100 state: pgroles_core::model::RoleState {
1101 login: true,
1102 ..pgroles_core::model::RoleState::default()
1103 },
1104 },
1105 pgroles_core::diff::Change::SetPassword {
1106 name: "app".to_string(),
1107 password: "super_secret".to_string(),
1108 },
1109 ];
1110 let ctx = pgroles_core::sql::SqlContext::default();
1111 let redacted = render_redacted_sql(&changes, &ctx);
1112
1113 assert!(redacted.contains("[REDACTED]"));
1114 assert!(!redacted.contains("super_secret"));
1115 assert!(redacted.contains("CREATE ROLE"));
1116 }
1117
1118 #[test]
1119 fn render_full_sql_includes_passwords() {
1120 let changes = vec![pgroles_core::diff::Change::SetPassword {
1121 name: "app".to_string(),
1122 password: "super_secret".to_string(),
1123 }];
1124 let ctx = pgroles_core::sql::SqlContext::default();
1125 let full = render_full_sql(&changes, &ctx);
1126
1127 assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1128 }
1129
1130 #[test]
1131 fn now_epoch_secs_returns_plausible_value() {
1132 let now = now_epoch_secs();
1133 let y2025 = 1_735_689_600_i64;
1135 let y2100 = 4_102_444_800_i64;
1136 assert!(
1137 now > y2025 && now < y2100,
1138 "epoch secs {now} should be between 2025 and 2100"
1139 );
1140 }
1141}