1use std::collections::BTreeMap;
8
9use k8s_openapi::api::core::v1::ConfigMap;
10use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
11use kube::api::{Api, ListParams, Patch, PatchParams, PostParams};
12use kube::{Client, Resource, ResourceExt};
13use sha2::{Digest, Sha256};
14use tracing::info;
15
16use crate::crd::{
17 ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_POLICY,
18 PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
19 PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
20 PostgresPolicyPlanStatus, SqlRef,
21};
22use crate::reconciler::ReconcileError;
23
24#[derive(Debug, Clone)]
27pub enum PlanCreationResult {
28 Created(String),
30 Deduplicated(String),
32}
33
34impl PlanCreationResult {
35 pub fn plan_name(&self) -> &str {
37 match self {
38 PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
39 }
40 }
41
42 pub fn is_created(&self) -> bool {
44 matches!(self, PlanCreationResult::Created(_))
45 }
46}
47
48const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
50
51const SQL_CONFIGMAP_KEY: &str = "plan.sql";
53
54const DEFAULT_MAX_PLANS: usize = 10;
56
57const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
61
62#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum PlanApprovalState {
69 Pending,
70 Approved,
71 Rejected,
72}
73
74pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
79 let annotations = plan.metadata.annotations.as_ref();
80
81 let rejected = annotations
82 .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
83 .map(|v| v == "true")
84 .unwrap_or(false);
85
86 if rejected {
87 return PlanApprovalState::Rejected;
88 }
89
90 let approved = annotations
91 .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
92 .map(|v| v == "true")
93 .unwrap_or(false);
94
95 if approved {
96 return PlanApprovalState::Approved;
97 }
98
99 PlanApprovalState::Pending
100}
101
102#[allow(clippy::too_many_arguments)]
119pub async fn create_or_update_plan(
120 client: &Client,
121 policy: &PostgresPolicy,
122 changes: &[pgroles_core::diff::Change],
123 sql_context: &pgroles_core::sql::SqlContext,
124 inspect_config: &pgroles_inspect::InspectConfig,
125 reconciliation_mode: CrdReconciliationMode,
126 database_identity: &str,
127 change_summary: &ChangeSummary,
128) -> Result<PlanCreationResult, ReconcileError> {
129 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
130 let policy_name = policy.name_any();
131 let generation = policy.metadata.generation.unwrap_or(0);
132
133 let full_sql = render_full_sql(changes, sql_context);
135
136 let sql_hash = compute_sql_hash(&full_sql);
138
139 let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
141
142 let redacted_sql = render_redacted_sql(changes, sql_context);
144
145 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
146
147 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
149 let existing_plans = plans_api
150 .list(&ListParams::default().labels(&label_selector))
151 .await?;
152
153 for plan in &existing_plans {
155 if let Some(ref status) = plan.status
156 && status.phase == PlanPhase::Pending
157 && status.sql_hash.as_deref() == Some(&sql_hash)
158 {
159 let plan_name = plan.name_any();
161 info!(
162 plan = %plan_name,
163 policy = %policy_name,
164 "existing pending plan has identical SQL hash, skipping creation"
165 );
166 return Ok(PlanCreationResult::Deduplicated(plan_name));
167 }
168 }
169
170 let now_ts = now_epoch_secs();
178 for plan in &existing_plans {
179 if let Some(ref status) = plan.status
180 && status.phase == PlanPhase::Failed
181 && status.sql_hash.as_deref() == Some(&sql_hash)
182 {
183 let failed_ts = status
184 .failed_at
185 .as_deref()
186 .and_then(parse_rfc3339_epoch_secs)
187 .unwrap_or(0);
188 if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
189 let plan_name = plan.name_any();
190 info!(
191 plan = %plan_name,
192 policy = %policy_name,
193 age_secs = now_ts - failed_ts,
194 "recently-failed plan has identical SQL hash, skipping creation"
195 );
196 return Ok(PlanCreationResult::Deduplicated(plan_name));
197 }
198 }
199 }
200
201 for plan in &existing_plans {
203 if let Some(ref status) = plan.status
204 && status.phase == PlanPhase::Pending
205 {
206 let plan_name = plan.name_any();
207 info!(
208 plan = %plan_name,
209 policy = %policy_name,
210 "marking existing pending plan as Superseded"
211 );
212 let superseded_status = PostgresPolicyPlanStatus {
213 phase: PlanPhase::Superseded,
214 ..status.clone()
215 };
216 let patch = serde_json::json!({ "status": superseded_status });
217 plans_api
218 .patch_status(
219 &plan_name,
220 &PatchParams::apply("pgroles-operator"),
221 &Patch::Merge(&patch),
222 )
223 .await?;
224 }
225 }
226
227 let plan_name = generate_plan_name(&policy_name);
229
230 let owner_ref = build_owner_reference(policy);
232
233 let plan = PostgresPolicyPlan::new(
235 &plan_name,
236 PostgresPolicyPlanSpec {
237 policy_ref: PolicyPlanRef {
238 name: policy_name.clone(),
239 },
240 policy_generation: generation,
241 reconciliation_mode,
242 owned_roles: inspect_config.managed_roles.clone(),
243 owned_schemas: inspect_config.managed_schemas.clone(),
244 managed_database_identity: database_identity.to_string(),
245 },
246 );
247 let mut plan = plan;
248 plan.metadata.namespace = Some(namespace.clone());
249 plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
250 plan.metadata.labels = Some(BTreeMap::from([
251 (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
252 (
253 LABEL_DATABASE_IDENTITY.to_string(),
254 sanitize_label_value(database_identity),
255 ),
256 ]));
257
258 let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
260 let summary_text = format!(
261 "{}R {}G {}D {}DP {}M",
262 change_summary.roles_created + change_summary.roles_altered,
263 change_summary.grants_added,
264 change_summary.default_privileges_set,
265 change_summary.roles_dropped,
266 change_summary.members_added,
267 );
268 plan.metadata.annotations = Some(BTreeMap::from([
269 ("pgroles.io/sql-preview".to_string(), sql_preview),
270 ("pgroles.io/summary".to_string(), summary_text),
271 (
272 "pgroles.io/sql-hash".to_string(),
273 sql_hash[..12].to_string(),
274 ),
275 ]));
276
277 let created_plan = plans_api.create(&PostParams::default(), &plan).await?;
278 let plan_name = created_plan.name_any();
279
280 let (sql_inline, sql_ref) = if redacted_sql.len() <= MAX_INLINE_SQL_BYTES {
282 (Some(redacted_sql), None)
283 } else {
284 let configmap_name = format!("{plan_name}-sql");
286 let configmap = ConfigMap {
287 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
288 name: Some(configmap_name.clone()),
289 namespace: Some(namespace.clone()),
290 owner_references: Some(vec![build_plan_owner_reference(&created_plan)]),
291 labels: Some(BTreeMap::from([(
292 LABEL_POLICY.to_string(),
293 sanitize_label_value(&policy_name),
294 )])),
295 ..Default::default()
296 },
297 data: Some(BTreeMap::from([(
298 SQL_CONFIGMAP_KEY.to_string(),
299 redacted_sql,
300 )])),
301 ..Default::default()
302 };
303
304 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
305 configmaps_api
306 .create(&PostParams::default(), &configmap)
307 .await?;
308
309 (
310 None,
311 Some(SqlRef {
312 name: configmap_name,
313 key: SQL_CONFIGMAP_KEY.to_string(),
314 }),
315 )
316 };
317
318 let plan_status = PostgresPolicyPlanStatus {
320 phase: PlanPhase::Pending,
321 conditions: vec![
322 PolicyCondition {
323 condition_type: "Computed".to_string(),
324 status: "True".to_string(),
325 reason: Some("PlanComputed".to_string()),
326 message: Some(format!(
327 "Plan computed with {} change(s)",
328 change_summary.total
329 )),
330 last_transition_time: Some(crate::crd::now_rfc3339()),
331 },
332 PolicyCondition {
333 condition_type: "Approved".to_string(),
334 status: "False".to_string(),
335 reason: Some("PendingApproval".to_string()),
336 message: Some("Plan awaiting approval".to_string()),
337 last_transition_time: Some(crate::crd::now_rfc3339()),
338 },
339 ],
340 change_summary: Some(change_summary.clone()),
341 sql_ref,
342 sql_inline,
343 computed_at: Some(crate::crd::now_rfc3339()),
344 applied_at: None,
345 last_error: None,
346 sql_hash: Some(sql_hash),
347 applying_since: None,
348 failed_at: None,
349 sql_statements: Some(sql_statement_count),
350 };
351
352 let status_patch = serde_json::json!({ "status": plan_status });
353 plans_api
354 .patch_status(
355 &plan_name,
356 &PatchParams::apply("pgroles-operator"),
357 &Patch::Merge(&status_patch),
358 )
359 .await?;
360
361 info!(
362 plan = %plan_name,
363 policy = %policy_name,
364 changes = change_summary.total,
365 "created new plan"
366 );
367
368 Ok(PlanCreationResult::Created(plan_name))
369}
370
371pub async fn execute_plan(
380 client: &Client,
381 plan: &PostgresPolicyPlan,
382 pool: &sqlx::PgPool,
383 sql_context: &pgroles_core::sql::SqlContext,
384 changes: &[pgroles_core::diff::Change],
385) -> Result<(), ReconcileError> {
386 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
387 let plan_name = plan.name_any();
388 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
389
390 update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
392
393 let result = execute_changes_in_transaction(pool, changes, sql_context).await;
397
398 match result {
399 Ok(statements_executed) => {
400 let mut applied_status = plan.status.clone().unwrap_or_default();
402 applied_status.phase = PlanPhase::Applied;
403 applied_status.applied_at = Some(crate::crd::now_rfc3339());
404 applied_status.last_error = None;
405 set_plan_condition(
406 &mut applied_status.conditions,
407 "Approved",
408 "True",
409 "Approved",
410 "Plan approved and executed",
411 );
412
413 let patch = serde_json::json!({ "status": applied_status });
414 plans_api
415 .patch_status(
416 &plan_name,
417 &PatchParams::apply("pgroles-operator"),
418 &Patch::Merge(&patch),
419 )
420 .await?;
421
422 info!(
423 plan = %plan_name,
424 statements = statements_executed,
425 "plan executed successfully"
426 );
427 Ok(())
428 }
429 Err(err) => {
430 let error_message = err.to_string();
432 let mut failed_status = plan.status.clone().unwrap_or_default();
433 failed_status.phase = PlanPhase::Failed;
434 failed_status.last_error = Some(error_message);
435 failed_status.failed_at = Some(crate::crd::now_rfc3339());
436
437 let patch = serde_json::json!({ "status": failed_status });
438 if let Err(status_err) = plans_api
439 .patch_status(
440 &plan_name,
441 &PatchParams::apply("pgroles-operator"),
442 &Patch::Merge(&patch),
443 )
444 .await
445 {
446 tracing::warn!(
447 plan = %plan_name,
448 %status_err,
449 "failed to update plan status to Failed"
450 );
451 }
452
453 Err(err)
454 }
455 }
456}
457
458async fn execute_changes_in_transaction(
462 pool: &sqlx::PgPool,
463 changes: &[pgroles_core::diff::Change],
464 sql_context: &pgroles_core::sql::SqlContext,
465) -> Result<usize, ReconcileError> {
466 let mut transaction = pool.begin().await?;
467 let mut statements_executed = 0usize;
468
469 for change in changes {
470 let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
471 for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
472 if is_sensitive {
473 tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
474 } else {
475 tracing::debug!(%sql, "executing");
476 }
477 sqlx::query(&sql).execute(transaction.as_mut()).await?;
478 statements_executed += 1;
479 }
480 }
481
482 transaction.commit().await?;
483 Ok(statements_executed)
484}
485
486pub async fn cleanup_old_plans(
495 client: &Client,
496 policy: &PostgresPolicy,
497 max_plans: Option<usize>,
498) -> Result<(), ReconcileError> {
499 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
500 let policy_name = policy.name_any();
501 let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
502
503 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
504 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
505 let existing_plans = plans_api
506 .list(&ListParams::default().labels(&label_selector))
507 .await?;
508
509 let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
511 .iter()
512 .filter(|plan| {
513 plan.status
514 .as_ref()
515 .map(|s| {
516 matches!(
517 s.phase,
518 PlanPhase::Applied
519 | PlanPhase::Failed
520 | PlanPhase::Superseded
521 | PlanPhase::Rejected
522 )
523 })
524 .unwrap_or(false)
525 })
526 .collect();
527
528 if terminal_plans.len() <= max_plans {
529 return Ok(());
530 }
531
532 terminal_plans.sort_by(|a, b| {
534 let a_time = a.metadata.creation_timestamp.as_ref();
535 let b_time = b.metadata.creation_timestamp.as_ref();
536 a_time.cmp(&b_time)
537 });
538
539 let plans_to_delete = terminal_plans.len() - max_plans;
540 for plan in terminal_plans.into_iter().take(plans_to_delete) {
541 let plan_name = plan.name_any();
542 info!(
543 plan = %plan_name,
544 policy = %policy_name,
545 "cleaning up old plan"
546 );
547 if let Err(err) = plans_api.delete(&plan_name, &Default::default()).await {
548 tracing::warn!(
549 plan = %plan_name,
550 %err,
551 "failed to delete old plan during cleanup"
552 );
553 }
554 }
555
556 Ok(())
557}
558
559pub(crate) fn render_full_sql(
565 changes: &[pgroles_core::diff::Change],
566 sql_context: &pgroles_core::sql::SqlContext,
567) -> String {
568 changes
569 .iter()
570 .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
571 .collect::<Vec<_>>()
572 .join("\n")
573}
574
575fn render_redacted_sql(
577 changes: &[pgroles_core::diff::Change],
578 sql_context: &pgroles_core::sql::SqlContext,
579) -> String {
580 changes
581 .iter()
582 .flat_map(|change| {
583 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
584 vec![format!(
585 "ALTER ROLE {} PASSWORD '[REDACTED]';",
586 pgroles_core::sql::quote_ident(name)
587 )]
588 } else {
589 pgroles_core::sql::render_statements_with_context(change, sql_context)
590 }
591 })
592 .collect::<Vec<_>>()
593 .join("\n")
594}
595
596pub(crate) fn compute_sql_hash(sql: &str) -> String {
598 use std::fmt::Write as _;
599
600 let mut hasher = Sha256::new();
601 hasher.update(sql.as_bytes());
602 let digest = hasher.finalize();
603 let mut hex = String::with_capacity(digest.len() * 2);
604 for byte in digest {
605 write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
606 }
607 hex
608}
609
610fn generate_plan_name(policy_name: &str) -> String {
617 let now = std::time::SystemTime::now()
618 .duration_since(std::time::UNIX_EPOCH)
619 .unwrap_or_default();
620 let timestamp = format_timestamp_compact();
621 let millis = now.subsec_millis();
622 let random_suffix: u32 = rand::random::<u32>() % 1000;
623 let suffix = format!("{millis:03}{random_suffix:03}");
624 let max_name_len = 253 - 4; let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
628 let prefix = if policy_name.len() > max_prefix_len {
629 &policy_name[..max_prefix_len]
630 } else {
631 policy_name
632 };
633 format!("{prefix}-plan-{timestamp}-{suffix}")
634}
635
636fn format_timestamp_compact() -> String {
638 use std::time::SystemTime;
639 let now = SystemTime::now()
640 .duration_since(SystemTime::UNIX_EPOCH)
641 .unwrap_or_default();
642 let secs = now.as_secs();
643 let (year, month, day) = crate::crd::days_to_date(secs / 86400);
644 let remaining = secs % 86400;
645 let hours = remaining / 3600;
646 let minutes = (remaining % 3600) / 60;
647 let seconds = remaining % 60;
648 format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
649}
650
651fn sanitize_label_value(value: &str) -> String {
655 let sanitized: String = value
656 .chars()
657 .map(|c| {
658 if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
659 c
660 } else {
661 '_'
662 }
663 })
664 .take(63)
665 .collect();
666 sanitized
667}
668
669fn now_epoch_secs() -> i64 {
671 std::time::SystemTime::now()
672 .duration_since(std::time::UNIX_EPOCH)
673 .unwrap_or_default()
674 .as_secs() as i64
675}
676
677fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
680 rfc3339
682 .parse::<jiff::Timestamp>()
683 .ok()
684 .map(|t| t.as_second())
685}
686
687fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
689 OwnerReference {
690 api_version: PostgresPolicy::api_version(&()).to_string(),
691 kind: PostgresPolicy::kind(&()).to_string(),
692 name: policy.name_any(),
693 uid: policy.metadata.uid.clone().unwrap_or_default(),
694 controller: Some(true),
695 block_owner_deletion: Some(true),
696 }
697}
698
699fn build_plan_owner_reference(plan: &PostgresPolicyPlan) -> OwnerReference {
701 OwnerReference {
702 api_version: PostgresPolicyPlan::api_version(&()).to_string(),
703 kind: PostgresPolicyPlan::kind(&()).to_string(),
704 name: plan.name_any(),
705 uid: plan.metadata.uid.clone().unwrap_or_default(),
706 controller: Some(true),
707 block_owner_deletion: Some(true),
708 }
709}
710
711async fn update_plan_phase(
716 plans_api: &Api<PostgresPolicyPlan>,
717 plan_name: &str,
718 phase: PlanPhase,
719) -> Result<(), ReconcileError> {
720 let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
721 if phase == PlanPhase::Applying {
722 patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
723 }
724 plans_api
725 .patch_status(
726 plan_name,
727 &PatchParams::apply("pgroles-operator"),
728 &Patch::Merge(&patch_value),
729 )
730 .await?;
731 Ok(())
732}
733
734fn set_plan_condition(
739 conditions: &mut Vec<PolicyCondition>,
740 condition_type: &str,
741 status: &str,
742 reason: &str,
743 message: &str,
744) {
745 let transition_time = if let Some(existing) = conditions
746 .iter()
747 .find(|c| c.condition_type == condition_type)
748 {
749 if existing.status == status {
750 existing.last_transition_time.clone()
751 } else {
752 Some(crate::crd::now_rfc3339())
753 }
754 } else {
755 Some(crate::crd::now_rfc3339())
756 };
757
758 let condition = PolicyCondition {
759 condition_type: condition_type.to_string(),
760 status: status.to_string(),
761 reason: Some(reason.to_string()),
762 message: Some(message.to_string()),
763 last_transition_time: transition_time,
764 };
765 if let Some(existing) = conditions
766 .iter_mut()
767 .find(|c| c.condition_type == condition_type)
768 {
769 *existing = condition;
770 } else {
771 conditions.push(condition);
772 }
773}
774
775pub async fn update_policy_plan_ref(
777 client: &Client,
778 policy: &PostgresPolicy,
779 plan_name: &str,
780) -> Result<(), ReconcileError> {
781 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
782 let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
783
784 let patch = serde_json::json!({
785 "status": {
786 "current_plan_ref": PlanReference {
787 name: plan_name.to_string(),
788 }
789 }
790 });
791
792 policy_api
793 .patch_status(
794 &policy.name_any(),
795 &PatchParams::apply("pgroles-operator"),
796 &Patch::Merge(&patch),
797 )
798 .await?;
799
800 Ok(())
801}
802
803pub async fn get_current_actionable_plan(
808 client: &Client,
809 policy: &PostgresPolicy,
810) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
811 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
812 let policy_name = policy.name_any();
813
814 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
815 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
816 let existing_plans = plans_api
817 .list(&ListParams::default().labels(&label_selector))
818 .await?;
819
820 let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
822 .into_iter()
823 .filter(|plan| {
824 plan.status
825 .as_ref()
826 .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
827 .unwrap_or(false)
828 })
829 .collect();
830
831 pending_plans.sort_by(|a, b| {
832 let a_time = a.metadata.creation_timestamp.as_ref();
833 let b_time = b.metadata.creation_timestamp.as_ref();
834 b_time.cmp(&a_time) });
836
837 Ok(pending_plans.into_iter().next())
838}
839
840pub async fn get_plan_by_phase(
842 client: &Client,
843 policy: &PostgresPolicy,
844 target_phase: PlanPhase,
845) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
846 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
847 let policy_name = policy.name_any();
848
849 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
850 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
851 let existing_plans = plans_api
852 .list(&ListParams::default().labels(&label_selector))
853 .await?;
854
855 let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
856 .into_iter()
857 .filter(|plan| {
858 plan.status
859 .as_ref()
860 .map(|s| s.phase == target_phase)
861 .unwrap_or(false)
862 })
863 .collect();
864
865 matching_plans.sort_by(|a, b| {
866 let a_time = a.metadata.creation_timestamp.as_ref();
867 let b_time = b.metadata.creation_timestamp.as_ref();
868 b_time.cmp(&a_time) });
870
871 Ok(matching_plans.into_iter().next())
872}
873
874pub async fn mark_plan_failed(
876 client: &Client,
877 plan: &PostgresPolicyPlan,
878 error_message: &str,
879) -> Result<(), ReconcileError> {
880 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
881 let plan_name = plan.name_any();
882 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
883
884 let mut status = plan.status.clone().unwrap_or_default();
885 status.phase = PlanPhase::Failed;
886 status.last_error = Some(error_message.to_string());
887 status.failed_at = Some(crate::crd::now_rfc3339());
888
889 let patch = serde_json::json!({ "status": status });
890 plans_api
891 .patch_status(
892 &plan_name,
893 &PatchParams::apply("pgroles-operator"),
894 &Patch::Merge(&patch),
895 )
896 .await?;
897
898 info!(
899 plan = %plan_name,
900 "marked stuck Applying plan as Failed"
901 );
902
903 Ok(())
904}
905
906pub async fn mark_plan_approved(
911 client: &Client,
912 plan: &PostgresPolicyPlan,
913 reason: &str,
914 message: &str,
915) -> Result<(), ReconcileError> {
916 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
917 let plan_name = plan.name_any();
918 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
919
920 let mut status = plan.status.clone().unwrap_or_default();
921 status.phase = PlanPhase::Approved;
922 set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
923
924 let patch = serde_json::json!({ "status": status });
925 plans_api
926 .patch_status(
927 &plan_name,
928 &PatchParams::apply("pgroles-operator"),
929 &Patch::Merge(&patch),
930 )
931 .await?;
932
933 Ok(())
934}
935
936pub async fn mark_plan_rejected(
938 client: &Client,
939 plan: &PostgresPolicyPlan,
940) -> Result<(), ReconcileError> {
941 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
942 let plan_name = plan.name_any();
943 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
944
945 let mut status = plan.status.clone().unwrap_or_default();
946 status.phase = PlanPhase::Rejected;
947 set_plan_condition(
948 &mut status.conditions,
949 "Approved",
950 "False",
951 "Rejected",
952 "Plan rejected via annotation",
953 );
954
955 let patch = serde_json::json!({ "status": status });
956 plans_api
957 .patch_status(
958 &plan_name,
959 &PatchParams::apply("pgroles-operator"),
960 &Patch::Merge(&patch),
961 )
962 .await?;
963
964 Ok(())
965}
966
967pub async fn mark_plan_superseded(
969 client: &Client,
970 plan: &PostgresPolicyPlan,
971) -> Result<(), ReconcileError> {
972 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
973 let plan_name = plan.name_any();
974 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
975
976 let mut status = plan.status.clone().unwrap_or_default();
977 status.phase = PlanPhase::Superseded;
978 set_plan_condition(
979 &mut status.conditions,
980 "Approved",
981 "False",
982 "Superseded",
983 "Database state changed since plan was approved",
984 );
985
986 let patch = serde_json::json!({ "status": status });
987 plans_api
988 .patch_status(
989 &plan_name,
990 &PatchParams::apply("pgroles-operator"),
991 &Patch::Merge(&patch),
992 )
993 .await?;
994
995 Ok(())
996}
997
998#[cfg(test)]
1003mod tests {
1004 use super::*;
1005 use crate::crd::CrdReconciliationMode;
1006
1007 fn test_plan(
1008 name: &str,
1009 phase: PlanPhase,
1010 annotations: Option<BTreeMap<String, String>>,
1011 ) -> PostgresPolicyPlan {
1012 let mut plan = PostgresPolicyPlan::new(
1013 name,
1014 PostgresPolicyPlanSpec {
1015 policy_ref: PolicyPlanRef {
1016 name: "test-policy".to_string(),
1017 },
1018 policy_generation: 1,
1019 reconciliation_mode: CrdReconciliationMode::Authoritative,
1020 owned_roles: vec!["role-a".to_string()],
1021 owned_schemas: vec!["public".to_string()],
1022 managed_database_identity: "default/db/DATABASE_URL".to_string(),
1023 },
1024 );
1025 plan.metadata.namespace = Some("default".to_string());
1026 plan.metadata.annotations = annotations;
1027 plan.status = Some(PostgresPolicyPlanStatus {
1028 phase,
1029 ..Default::default()
1030 });
1031 plan
1032 }
1033
1034 #[test]
1035 fn check_plan_approval_pending_when_no_annotations() {
1036 let plan = test_plan("plan-1", PlanPhase::Pending, None);
1037 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1038 }
1039
1040 #[test]
1041 fn check_plan_approval_approved_with_annotation() {
1042 let annotations =
1043 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1044 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1045 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1046 }
1047
1048 #[test]
1049 fn check_plan_approval_rejected_with_annotation() {
1050 let annotations =
1051 BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1052 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1053 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1054 }
1055
1056 #[test]
1057 fn check_plan_approval_rejected_wins_over_approved() {
1058 let annotations = BTreeMap::from([
1059 (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1060 (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1061 ]);
1062 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1063 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1064 }
1065
1066 #[test]
1067 fn check_plan_approval_non_true_value_is_pending() {
1068 let annotations =
1069 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1070 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1071 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1072 }
1073
1074 #[test]
1075 fn compute_sql_hash_is_deterministic() {
1076 let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1077 let hash1 = compute_sql_hash(sql);
1078 let hash2 = compute_sql_hash(sql);
1079 assert_eq!(hash1, hash2);
1080 assert_eq!(hash1.len(), 64); }
1082
1083 #[test]
1084 fn compute_sql_hash_differs_for_different_sql() {
1085 let hash1 = compute_sql_hash("CREATE ROLE a;");
1086 let hash2 = compute_sql_hash("CREATE ROLE b;");
1087 assert_ne!(hash1, hash2);
1088 }
1089
1090 #[test]
1091 fn generate_plan_name_has_expected_format() {
1092 let name = generate_plan_name("my-policy");
1093 assert!(name.starts_with("my-policy-plan-"));
1094 let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1096 assert_eq!(suffix.len(), 22);
1098 assert_eq!(&suffix[8..9], "-");
1099 assert_eq!(&suffix[15..16], "-");
1100 }
1101
1102 #[test]
1103 fn generate_plan_name_is_unique_across_calls() {
1104 let name1 = generate_plan_name("my-policy");
1105 let name2 = generate_plan_name("my-policy");
1106 assert_ne!(name1, name2);
1109 }
1110
1111 #[test]
1112 fn sanitize_label_value_replaces_slashes() {
1113 let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1114 assert!(!sanitized.contains('/'));
1115 assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1116 }
1117
1118 #[test]
1119 fn sanitize_label_value_truncates_to_63_chars() {
1120 let long_value = "a".repeat(100);
1121 let sanitized = sanitize_label_value(&long_value);
1122 assert!(sanitized.len() <= 63);
1123 }
1124
1125 #[test]
1126 fn render_redacted_sql_masks_passwords() {
1127 let changes = vec![
1128 pgroles_core::diff::Change::CreateRole {
1129 name: "app".to_string(),
1130 state: pgroles_core::model::RoleState {
1131 login: true,
1132 ..pgroles_core::model::RoleState::default()
1133 },
1134 },
1135 pgroles_core::diff::Change::SetPassword {
1136 name: "app".to_string(),
1137 password: "super_secret".to_string(),
1138 },
1139 ];
1140 let ctx = pgroles_core::sql::SqlContext::default();
1141 let redacted = render_redacted_sql(&changes, &ctx);
1142
1143 assert!(redacted.contains("[REDACTED]"));
1144 assert!(!redacted.contains("super_secret"));
1145 assert!(redacted.contains("CREATE ROLE"));
1146 }
1147
1148 #[test]
1149 fn render_full_sql_includes_passwords() {
1150 let changes = vec![pgroles_core::diff::Change::SetPassword {
1151 name: "app".to_string(),
1152 password: "super_secret".to_string(),
1153 }];
1154 let ctx = pgroles_core::sql::SqlContext::default();
1155 let full = render_full_sql(&changes, &ctx);
1156
1157 assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1158 }
1159
1160 #[test]
1161 fn now_epoch_secs_returns_plausible_value() {
1162 let now = now_epoch_secs();
1163 let y2025 = 1_735_689_600_i64;
1165 let y2100 = 4_102_444_800_i64;
1166 assert!(
1167 now > y2025 && now < y2100,
1168 "epoch secs {now} should be between 2025 and 2100"
1169 );
1170 }
1171}