1use std::collections::{BTreeMap, BTreeSet};
8use std::io::Write;
9use std::time::Duration;
10
11use flate2::Compression;
12use flate2::write::GzEncoder;
13use k8s_openapi::ByteString;
14use k8s_openapi::api::core::v1::ConfigMap;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
16use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
17use kube::{Client, Resource, ResourceExt};
18use sha2::{Digest, Sha256};
19use tracing::info;
20
21use crate::crd::{
22 ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_PLAN, LABEL_POLICY,
23 PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
24 PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
25 PostgresPolicyPlanStatus, SqlCompression, SqlRef,
26};
27use crate::reconciler::ReconcileError;
28
29#[derive(Debug, Clone)]
32pub enum PlanCreationResult {
33 Created(String),
35 Deduplicated(String),
37}
38
39impl PlanCreationResult {
40 pub fn plan_name(&self) -> &str {
42 match self {
43 PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
44 }
45 }
46
47 pub fn is_created(&self) -> bool {
49 matches!(self, PlanCreationResult::Created(_))
50 }
51}
52
53const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
55
56const SQL_CONFIGMAP_GZIP_KEY: &str = "plan.sql.gz";
58
59const MAX_CONFIGMAP_SQL_BYTES: usize = 900 * 1024;
62
63const ORPHAN_GRACE_SECS: i64 = 60;
65
66const CLEANUP_TIMEOUT_SECS: u64 = 5;
68
69const DEFAULT_MAX_PLANS: usize = 10;
71
72const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78enum PlanSqlArtifact {
79 Inline(String),
80 CompressedConfigMap {
81 configmap_name: String,
82 key: String,
83 compressed_sql: Vec<u8>,
84 },
85 TruncatedInline(String),
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89struct PreparedPlanSql {
90 artifact: PlanSqlArtifact,
91 redacted_sql_hash: String,
92 original_bytes: usize,
93 stored_bytes: usize,
94}
95
96impl PreparedPlanSql {
97 fn sql_ref(&self) -> Option<SqlRef> {
98 match &self.artifact {
99 PlanSqlArtifact::CompressedConfigMap {
100 configmap_name,
101 key,
102 ..
103 } => Some(SqlRef {
104 name: configmap_name.clone(),
105 key: key.clone(),
106 compression: Some(SqlCompression::Gzip),
107 }),
108 PlanSqlArtifact::Inline(_) | PlanSqlArtifact::TruncatedInline(_) => None,
109 }
110 }
111
112 fn sql_inline(&self) -> Option<String> {
113 match &self.artifact {
114 PlanSqlArtifact::Inline(sql) | PlanSqlArtifact::TruncatedInline(sql) => {
115 Some(sql.clone())
116 }
117 PlanSqlArtifact::CompressedConfigMap { .. } => None,
118 }
119 }
120
121 fn is_truncated(&self) -> bool {
122 matches!(self.artifact, PlanSqlArtifact::TruncatedInline(_))
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
132pub enum PlanApprovalState {
133 Pending,
134 Approved,
135 Rejected,
136}
137
138pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
143 let annotations = plan.metadata.annotations.as_ref();
144
145 let rejected = annotations
146 .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
147 .map(|v| v == "true")
148 .unwrap_or(false);
149
150 if rejected {
151 return PlanApprovalState::Rejected;
152 }
153
154 let approved = annotations
155 .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
156 .map(|v| v == "true")
157 .unwrap_or(false);
158
159 if approved {
160 return PlanApprovalState::Approved;
161 }
162
163 PlanApprovalState::Pending
164}
165
166#[allow(clippy::too_many_arguments)]
183pub async fn create_or_update_plan(
184 client: &Client,
185 policy: &PostgresPolicy,
186 changes: &[pgroles_core::diff::Change],
187 sql_context: &pgroles_core::sql::SqlContext,
188 inspect_config: &pgroles_inspect::InspectConfig,
189 reconciliation_mode: CrdReconciliationMode,
190 database_identity: &str,
191 change_summary: &ChangeSummary,
192) -> Result<PlanCreationResult, ReconcileError> {
193 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
194 let policy_name = policy.name_any();
195 let generation = policy.metadata.generation.unwrap_or(0);
196
197 let full_sql = render_full_sql(changes, sql_context);
199
200 let sql_hash = compute_sql_hash(&full_sql);
202
203 let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
205
206 let redacted_sql = render_redacted_sql(changes, sql_context);
208
209 cleanup_old_plans_best_effort(client, policy, None).await;
210
211 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
212
213 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
215 let existing_plans = plans_api
216 .list(&ListParams::default().labels(&label_selector))
217 .await?;
218
219 for plan in &existing_plans {
221 if let Some(ref status) = plan.status
222 && status.phase == PlanPhase::Pending
223 && status.sql_hash.as_deref() == Some(&sql_hash)
224 {
225 let plan_name = plan.name_any();
227 info!(
228 plan = %plan_name,
229 policy = %policy_name,
230 "existing pending plan has identical SQL hash, skipping creation"
231 );
232 return Ok(PlanCreationResult::Deduplicated(plan_name));
233 }
234 }
235
236 let now_ts = now_epoch_secs();
244 for plan in &existing_plans {
245 if let Some(ref status) = plan.status
246 && status.phase == PlanPhase::Failed
247 && status.sql_hash.as_deref() == Some(&sql_hash)
248 {
249 let failed_ts = status
250 .failed_at
251 .as_deref()
252 .and_then(parse_rfc3339_epoch_secs)
253 .unwrap_or(0);
254 if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
255 let plan_name = plan.name_any();
256 info!(
257 plan = %plan_name,
258 policy = %policy_name,
259 age_secs = now_ts - failed_ts,
260 "recently-failed plan has identical SQL hash, skipping creation"
261 );
262 return Ok(PlanCreationResult::Deduplicated(plan_name));
263 }
264 }
265 }
266
267 let plan_name = generate_plan_name(&policy_name, &sql_hash);
270 let prepared_sql = prepare_plan_sql(&plan_name, &redacted_sql)?;
271
272 let sql_configmap_name = create_plan_sql_configmap(
274 client,
275 policy,
276 &namespace,
277 &policy_name,
278 database_identity,
279 &prepared_sql,
280 )
281 .await?;
282
283 let owner_ref = build_owner_reference(policy);
285
286 let plan = PostgresPolicyPlan::new(
288 &plan_name,
289 PostgresPolicyPlanSpec {
290 policy_ref: PolicyPlanRef {
291 name: policy_name.clone(),
292 },
293 policy_generation: generation,
294 reconciliation_mode,
295 owned_roles: inspect_config.managed_roles.clone(),
296 owned_schemas: inspect_config.managed_schemas.clone(),
297 managed_database_identity: database_identity.to_string(),
298 },
299 );
300 let mut plan = plan;
301 plan.metadata.namespace = Some(namespace.clone());
302 plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
303 plan.metadata.labels = Some(BTreeMap::from([
304 (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
305 (
306 LABEL_DATABASE_IDENTITY.to_string(),
307 sanitize_label_value(database_identity),
308 ),
309 ]));
310
311 let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
313 let summary_text = format!(
314 "{}R {}G {}D {}DP {}M",
315 change_summary.roles_created + change_summary.roles_altered,
316 change_summary.grants_added,
317 change_summary.default_privileges_set,
318 change_summary.roles_dropped,
319 change_summary.members_added,
320 );
321 plan.metadata.annotations = Some(BTreeMap::from([
322 ("pgroles.io/sql-preview".to_string(), sql_preview),
323 ("pgroles.io/summary".to_string(), summary_text),
324 (
325 "pgroles.io/sql-hash".to_string(),
326 sql_hash[..12].to_string(),
327 ),
328 (
329 "pgroles.io/redacted-sql-hash".to_string(),
330 prepared_sql.redacted_sql_hash[..12].to_string(),
331 ),
332 (
333 "pgroles.io/sql-original-bytes".to_string(),
334 prepared_sql.original_bytes.to_string(),
335 ),
336 (
337 "pgroles.io/sql-stored-bytes".to_string(),
338 prepared_sql.stored_bytes.to_string(),
339 ),
340 ]));
341
342 let (created_plan, created_new_plan) =
343 match plans_api.create(&PostParams::default(), &plan).await {
344 Ok(plan) => (plan, true),
345 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
346 let existing = plans_api.get(&plan_name).await?;
347 if !should_patch_existing_plan_status(&existing) {
348 return Ok(PlanCreationResult::Deduplicated(existing.name_any()));
349 }
350 (existing, false)
351 }
352 Err(err) => {
353 if let Some(configmap_name) = sql_configmap_name.as_deref() {
354 delete_configmap_best_effort(client, &namespace, configmap_name).await;
355 }
356 return Err(err.into());
357 }
358 };
359 let plan_name = created_plan.name_any();
360
361 let computed_message = if prepared_sql.is_truncated() {
363 format!(
364 "Plan computed with {} change(s); SQL preview truncated because compressed SQL exceeded Kubernetes ConfigMap limits",
365 change_summary.total
366 )
367 } else {
368 format!("Plan computed with {} change(s)", change_summary.total)
369 };
370 let plan_status = PostgresPolicyPlanStatus {
371 phase: PlanPhase::Pending,
372 conditions: vec![
373 PolicyCondition {
374 condition_type: "Computed".to_string(),
375 status: "True".to_string(),
376 reason: Some("PlanComputed".to_string()),
377 message: Some(computed_message),
378 last_transition_time: Some(crate::crd::now_rfc3339()),
379 },
380 PolicyCondition {
381 condition_type: "Approved".to_string(),
382 status: "False".to_string(),
383 reason: Some("PendingApproval".to_string()),
384 message: Some("Plan awaiting approval".to_string()),
385 last_transition_time: Some(crate::crd::now_rfc3339()),
386 },
387 ],
388 change_summary: Some(change_summary.clone()),
389 sql_ref: prepared_sql.sql_ref(),
390 sql_inline: prepared_sql.sql_inline(),
391 sql_truncated: prepared_sql.is_truncated(),
392 computed_at: Some(crate::crd::now_rfc3339()),
393 applied_at: None,
394 last_error: None,
395 sql_hash: Some(sql_hash),
396 applying_since: None,
397 failed_at: None,
398 sql_statements: Some(sql_statement_count),
399 redacted_sql_hash: Some(prepared_sql.redacted_sql_hash.clone()),
400 sql_original_bytes: Some(prepared_sql.original_bytes as i64),
401 sql_stored_bytes: Some(prepared_sql.stored_bytes as i64),
402 };
403
404 let status_patch = serde_json::json!({ "status": plan_status });
405 if let Err(err) = plans_api
406 .patch_status(
407 &plan_name,
408 &PatchParams::apply("pgroles-operator"),
409 &Patch::Merge(&status_patch),
410 )
411 .await
412 {
413 if created_new_plan {
414 delete_plan_best_effort(&plans_api, &plan_name).await;
415 }
416 if let Some(configmap_name) = sql_configmap_name.as_deref() {
417 delete_configmap_best_effort(client, &namespace, configmap_name).await;
418 }
419 return Err(err.into());
420 }
421
422 for plan in &existing_plans {
426 if let Some(ref status) = plan.status
427 && status.phase == PlanPhase::Pending
428 && plan.name_any() != plan_name
429 {
430 let old_plan_name = plan.name_any();
431 info!(
432 plan = %old_plan_name,
433 policy = %policy_name,
434 "marking existing pending plan as Superseded"
435 );
436 let superseded_status = PostgresPolicyPlanStatus {
437 phase: PlanPhase::Superseded,
438 ..status.clone()
439 };
440 let patch = serde_json::json!({ "status": superseded_status });
441 plans_api
442 .patch_status(
443 &old_plan_name,
444 &PatchParams::apply("pgroles-operator"),
445 &Patch::Merge(&patch),
446 )
447 .await?;
448 }
449 }
450
451 info!(
452 plan = %plan_name,
453 policy = %policy_name,
454 changes = change_summary.total,
455 "created new plan"
456 );
457
458 Ok(PlanCreationResult::Created(plan_name))
459}
460
461pub async fn execute_plan(
472 client: &Client,
473 plan: &PostgresPolicyPlan,
474 pool: &sqlx::PgPool,
475 sql_context: &pgroles_core::sql::SqlContext,
476 changes: &[pgroles_core::diff::Change],
477) -> Result<(), ReconcileError> {
478 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
479 let plan_name = plan.name_any();
480 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
481
482 update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
484
485 let result = execute_changes_in_transaction(pool, changes, sql_context).await;
489
490 match result {
491 Ok(statements_executed) => {
492 let mut applied_status = plan.status.clone().unwrap_or_default();
494 applied_status.phase = PlanPhase::Applied;
495 applied_status.applied_at = Some(crate::crd::now_rfc3339());
496 applied_status.last_error = None;
497 set_plan_condition(
498 &mut applied_status.conditions,
499 "Approved",
500 "True",
501 "Approved",
502 "Plan approved and executed",
503 );
504
505 let patch = serde_json::json!({ "status": applied_status });
506 plans_api
507 .patch_status(
508 &plan_name,
509 &PatchParams::apply("pgroles-operator"),
510 &Patch::Merge(&patch),
511 )
512 .await?;
513
514 info!(
515 plan = %plan_name,
516 statements = statements_executed,
517 "plan executed successfully"
518 );
519 Ok(())
520 }
521 Err(err) => {
522 let error_message = err.to_string();
524 let mut failed_status = plan.status.clone().unwrap_or_default();
525 failed_status.phase = PlanPhase::Failed;
526 failed_status.last_error = Some(error_message);
527 failed_status.failed_at = Some(crate::crd::now_rfc3339());
528
529 let patch = serde_json::json!({ "status": failed_status });
530 if let Err(status_err) = plans_api
531 .patch_status(
532 &plan_name,
533 &PatchParams::apply("pgroles-operator"),
534 &Patch::Merge(&patch),
535 )
536 .await
537 {
538 tracing::warn!(
539 plan = %plan_name,
540 %status_err,
541 "failed to update plan status to Failed"
542 );
543 }
544
545 Err(err)
546 }
547 }
548}
549
550fn prepare_plan_sql(
551 plan_name: &str,
552 redacted_sql: &str,
553) -> Result<PreparedPlanSql, ReconcileError> {
554 let original_bytes = redacted_sql.len();
555 let redacted_sql_hash = compute_sql_hash(redacted_sql);
556
557 if original_bytes <= MAX_INLINE_SQL_BYTES {
558 return Ok(PreparedPlanSql {
559 artifact: PlanSqlArtifact::Inline(redacted_sql.to_string()),
560 redacted_sql_hash,
561 original_bytes,
562 stored_bytes: original_bytes,
563 });
564 }
565
566 let compressed_sql = gzip_bytes(redacted_sql.as_bytes())?;
567 if compressed_sql.len() <= MAX_CONFIGMAP_SQL_BYTES {
568 let stored_bytes = compressed_sql.len();
569 return Ok(PreparedPlanSql {
570 artifact: PlanSqlArtifact::CompressedConfigMap {
571 configmap_name: format!("{plan_name}-sql"),
572 key: SQL_CONFIGMAP_GZIP_KEY.to_string(),
573 compressed_sql,
574 },
575 redacted_sql_hash,
576 original_bytes,
577 stored_bytes,
578 });
579 }
580
581 let truncated = truncate_utf8(
582 redacted_sql,
583 MAX_INLINE_SQL_BYTES,
584 "\n-- truncated: compressed SQL preview exceeded Kubernetes ConfigMap limits --",
585 );
586 let stored_bytes = truncated.len();
587 Ok(PreparedPlanSql {
588 artifact: PlanSqlArtifact::TruncatedInline(truncated),
589 redacted_sql_hash,
590 original_bytes,
591 stored_bytes,
592 })
593}
594
595fn gzip_bytes(bytes: &[u8]) -> Result<Vec<u8>, ReconcileError> {
596 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
597 encoder
598 .write_all(bytes)
599 .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))?;
600 encoder
601 .finish()
602 .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))
603}
604
605fn truncate_utf8(text: &str, max_bytes: usize, marker: &str) -> String {
606 if text.len() <= max_bytes {
607 return text.to_string();
608 }
609
610 let target_len = max_bytes.saturating_sub(marker.len());
611 let mut end = target_len.min(text.len());
612 while end > 0 && !text.is_char_boundary(end) {
613 end -= 1;
614 }
615
616 let mut truncated = text[..end].to_string();
617 truncated.push_str(marker);
618 truncated
619}
620
621async fn create_plan_sql_configmap(
622 client: &Client,
623 policy: &PostgresPolicy,
624 namespace: &str,
625 policy_name: &str,
626 database_identity: &str,
627 prepared_sql: &PreparedPlanSql,
628) -> Result<Option<String>, ReconcileError> {
629 let PlanSqlArtifact::CompressedConfigMap {
630 configmap_name,
631 key: _,
632 compressed_sql: _,
633 } = &prepared_sql.artifact
634 else {
635 return Ok(None);
636 };
637
638 let configmap = build_plan_sql_configmap_object(
639 policy,
640 namespace,
641 policy_name,
642 database_identity,
643 prepared_sql,
644 )?;
645
646 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
647 match configmaps_api
648 .create(&PostParams::default(), &configmap)
649 .await
650 {
651 Ok(_) => Ok(Some(configmap_name.clone())),
652 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
653 let existing = configmaps_api.get(configmap_name).await?;
654 validate_existing_sql_configmap(&existing, prepared_sql)?;
655 Ok(Some(configmap_name.clone()))
656 }
657 Err(err) => Err(err.into()),
658 }
659}
660
661fn build_plan_sql_configmap_object(
662 policy: &PostgresPolicy,
663 namespace: &str,
664 policy_name: &str,
665 database_identity: &str,
666 prepared_sql: &PreparedPlanSql,
667) -> Result<ConfigMap, ReconcileError> {
668 let PlanSqlArtifact::CompressedConfigMap {
669 configmap_name,
670 key,
671 compressed_sql,
672 } = &prepared_sql.artifact
673 else {
674 return Err(ReconcileError::PlanSqlStorage(
675 "cannot build ConfigMap for inline plan SQL".to_string(),
676 ));
677 };
678
679 Ok(ConfigMap {
680 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
681 name: Some(configmap_name.clone()),
682 namespace: Some(namespace.to_string()),
683 owner_references: Some(vec![build_owner_reference(policy)]),
684 labels: Some(BTreeMap::from([
685 (LABEL_POLICY.to_string(), sanitize_label_value(policy_name)),
686 (
687 LABEL_DATABASE_IDENTITY.to_string(),
688 sanitize_label_value(database_identity),
689 ),
690 (
691 LABEL_PLAN.to_string(),
692 plan_label_value(configmap_plan_name(configmap_name)),
693 ),
694 ])),
695 annotations: Some(BTreeMap::from([
696 ("pgroles.io/sql-compression".to_string(), "gzip".to_string()),
697 (
698 "pgroles.io/redacted-sql-hash".to_string(),
699 prepared_sql.redacted_sql_hash.clone(),
700 ),
701 (
702 "pgroles.io/sql-original-bytes".to_string(),
703 prepared_sql.original_bytes.to_string(),
704 ),
705 (
706 "pgroles.io/sql-stored-bytes".to_string(),
707 prepared_sql.stored_bytes.to_string(),
708 ),
709 ])),
710 ..Default::default()
711 },
712 binary_data: Some(BTreeMap::from([(
713 key.clone(),
714 ByteString(compressed_sql.clone()),
715 )])),
716 ..Default::default()
717 })
718}
719
720fn configmap_plan_name(configmap_name: &str) -> &str {
721 configmap_name
722 .strip_suffix("-sql")
723 .unwrap_or(configmap_name)
724}
725
726fn plan_label_value(plan_name: &str) -> String {
727 compute_sql_hash(plan_name)[..32].to_string()
728}
729
730fn validate_existing_sql_configmap(
731 configmap: &ConfigMap,
732 prepared_sql: &PreparedPlanSql,
733) -> Result<(), ReconcileError> {
734 let Some(annotations) = configmap.metadata.annotations.as_ref() else {
735 return Err(ReconcileError::PlanSqlStorage(format!(
736 "existing ConfigMap {} is missing SQL storage annotations",
737 configmap.name_any()
738 )));
739 };
740 let hash_matches = annotations
741 .get("pgroles.io/redacted-sql-hash")
742 .map(|hash| hash == &prepared_sql.redacted_sql_hash)
743 .unwrap_or(false);
744 if hash_matches {
745 Ok(())
746 } else {
747 Err(ReconcileError::PlanSqlStorage(format!(
748 "existing ConfigMap {} does not match computed SQL preview hash",
749 configmap.name_any()
750 )))
751 }
752}
753
754async fn execute_changes_in_transaction(
758 pool: &sqlx::PgPool,
759 changes: &[pgroles_core::diff::Change],
760 sql_context: &pgroles_core::sql::SqlContext,
761) -> Result<usize, ReconcileError> {
762 let mut transaction = pool.begin().await?;
763 let mut statements_executed = 0usize;
764
765 for change in changes {
766 let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
767 for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
768 if is_sensitive {
769 tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
770 } else {
771 tracing::debug!(%sql, "executing");
772 }
773 sqlx::query(&sql).execute(transaction.as_mut()).await?;
774 statements_executed += 1;
775 }
776 }
777
778 transaction.commit().await?;
779 Ok(statements_executed)
780}
781
782pub async fn cleanup_old_plans_best_effort(
789 client: &Client,
790 policy: &PostgresPolicy,
791 max_plans: Option<usize>,
792) {
793 match tokio::time::timeout(
794 Duration::from_secs(CLEANUP_TIMEOUT_SECS),
795 cleanup_old_plans(client, policy, max_plans),
796 )
797 .await
798 {
799 Ok(Ok(())) => {}
800 Ok(Err(err)) => tracing::warn!(%err, "failed to clean up old plans"),
801 Err(_) => tracing::warn!(
802 timeout_secs = CLEANUP_TIMEOUT_SECS,
803 "timed out cleaning up old plans"
804 ),
805 }
806}
807
808pub async fn cleanup_old_plans(
814 client: &Client,
815 policy: &PostgresPolicy,
816 max_plans: Option<usize>,
817) -> Result<(), ReconcileError> {
818 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
819 let policy_name = policy.name_any();
820 let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
821
822 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
823 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
824 let existing_plans = plans_api
825 .list(&ListParams::default().labels(&label_selector))
826 .await?;
827 let now_ts = now_epoch_secs();
828
829 for plan in existing_plans
830 .iter()
831 .filter(|plan| is_stale_statusless_plan(plan, now_ts))
832 {
833 let plan_name = plan.name_any();
834 info!(
835 plan = %plan_name,
836 policy = %policy_name,
837 "cleaning up stale status-less plan"
838 );
839 if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
840 tracing::warn!(
841 plan = %plan_name,
842 %err,
843 "failed to delete stale status-less plan during cleanup"
844 );
845 }
846 }
847
848 let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
850 .iter()
851 .filter(|plan| {
852 plan.status
853 .as_ref()
854 .map(|s| {
855 matches!(
856 s.phase,
857 PlanPhase::Applied
858 | PlanPhase::Failed
859 | PlanPhase::Superseded
860 | PlanPhase::Rejected
861 )
862 })
863 .unwrap_or(false)
864 })
865 .collect();
866
867 if terminal_plans.len() > max_plans {
868 terminal_plans.sort_by(|a, b| {
870 let a_time = a.metadata.creation_timestamp.as_ref();
871 let b_time = b.metadata.creation_timestamp.as_ref();
872 a_time.cmp(&b_time)
873 });
874
875 let plans_to_delete = terminal_plans.len() - max_plans;
876 for plan in terminal_plans.into_iter().take(plans_to_delete) {
877 let plan_name = plan.name_any();
878 info!(
879 plan = %plan_name,
880 policy = %policy_name,
881 "cleaning up old plan"
882 );
883 if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
884 tracing::warn!(
885 plan = %plan_name,
886 %err,
887 "failed to delete old plan during cleanup"
888 );
889 }
890 }
891 }
892
893 cleanup_orphan_sql_configmaps(
894 client,
895 &namespace,
896 &policy_name,
897 &existing_plans.items,
898 now_ts,
899 )
900 .await?;
901
902 Ok(())
903}
904
905async fn cleanup_orphan_sql_configmaps(
910 client: &Client,
911 namespace: &str,
912 policy_name: &str,
913 existing_plans: &[PostgresPolicyPlan],
914 now_ts: i64,
915) -> Result<(), ReconcileError> {
916 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
917 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(policy_name));
918 let configmaps = configmaps_api
919 .list(&ListParams::default().labels(&label_selector))
920 .await?;
921 let known_plan_labels: BTreeSet<String> = existing_plans
922 .iter()
923 .map(|plan| plan_label_value(&plan.name_any()))
924 .collect();
925 let known_plan_names: BTreeSet<String> =
926 existing_plans.iter().map(ResourceExt::name_any).collect();
927
928 for configmap in configmaps {
929 if !is_orphan_sql_configmap(&configmap, &known_plan_names, &known_plan_labels, now_ts) {
930 continue;
931 }
932
933 let configmap_name = configmap.name_any();
934 info!(
935 configmap = %configmap_name,
936 policy = %policy_name,
937 "cleaning up orphan plan SQL ConfigMap"
938 );
939 if let Err(err) = configmaps_api
940 .delete(&configmap_name, &DeleteParams::default())
941 .await
942 {
943 tracing::warn!(
944 configmap = %configmap_name,
945 %err,
946 "failed to delete orphan plan SQL ConfigMap during cleanup"
947 );
948 }
949 }
950
951 Ok(())
952}
953
954fn is_orphan_sql_configmap(
955 configmap: &ConfigMap,
956 known_plan_names: &BTreeSet<String>,
957 known_plan_labels: &BTreeSet<String>,
958 now_ts: i64,
959) -> bool {
960 let Some(labels) = configmap.metadata.labels.as_ref() else {
961 return false;
962 };
963 if !labels.contains_key(LABEL_POLICY) || !is_stale_object(configmap, now_ts) {
964 return false;
965 }
966 if known_plan_names.contains(configmap_plan_name(&configmap.name_any())) {
967 return false;
968 }
969 labels
970 .get(LABEL_PLAN)
971 .map(|plan_label| !known_plan_labels.contains(plan_label))
972 .unwrap_or(true)
973}
974
975fn should_patch_existing_plan_status(plan: &PostgresPolicyPlan) -> bool {
976 plan.status
977 .as_ref()
978 .map(|status| status.phase == PlanPhase::Pending)
979 .unwrap_or(true)
980}
981
982fn is_stale_statusless_plan(plan: &PostgresPolicyPlan, now_ts: i64) -> bool {
983 plan.status.is_none() && is_stale_object(plan, now_ts)
984}
985
986fn is_stale_object<K>(resource: &K, now_ts: i64) -> bool
987where
988 K: Resource,
989{
990 resource
991 .meta()
992 .creation_timestamp
993 .as_ref()
994 .map(|timestamp| now_ts.saturating_sub(timestamp.0.as_second()) > ORPHAN_GRACE_SECS)
995 .unwrap_or(false)
996}
997
998async fn delete_plan_best_effort(plans_api: &Api<PostgresPolicyPlan>, plan_name: &str) {
999 if let Err(err) = plans_api.delete(plan_name, &DeleteParams::default()).await {
1000 tracing::warn!(
1001 plan = %plan_name,
1002 %err,
1003 "failed to roll back plan after status update failure"
1004 );
1005 }
1006}
1007
1008async fn delete_configmap_best_effort(client: &Client, namespace: &str, configmap_name: &str) {
1009 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
1010 if let Err(err) = configmaps_api
1011 .delete(configmap_name, &DeleteParams::default())
1012 .await
1013 {
1014 tracing::warn!(
1015 configmap = %configmap_name,
1016 %err,
1017 "failed to roll back plan SQL ConfigMap"
1018 );
1019 }
1020}
1021
1022pub(crate) fn render_full_sql(
1024 changes: &[pgroles_core::diff::Change],
1025 sql_context: &pgroles_core::sql::SqlContext,
1026) -> String {
1027 changes
1028 .iter()
1029 .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
1030 .collect::<Vec<_>>()
1031 .join("\n")
1032}
1033
1034fn render_redacted_sql(
1036 changes: &[pgroles_core::diff::Change],
1037 sql_context: &pgroles_core::sql::SqlContext,
1038) -> String {
1039 changes
1040 .iter()
1041 .flat_map(|change| {
1042 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1043 vec![format!(
1044 "ALTER ROLE {} PASSWORD '[REDACTED]';",
1045 pgroles_core::sql::quote_ident(name)
1046 )]
1047 } else {
1048 pgroles_core::sql::render_statements_with_context(change, sql_context)
1049 }
1050 })
1051 .collect::<Vec<_>>()
1052 .join("\n")
1053}
1054
1055pub(crate) fn compute_sql_hash(sql: &str) -> String {
1057 use std::fmt::Write as _;
1058
1059 let mut hasher = Sha256::new();
1060 hasher.update(sql.as_bytes());
1061 let digest = hasher.finalize();
1062 let mut hex = String::with_capacity(digest.len() * 2);
1063 for byte in digest {
1064 write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
1065 }
1066 hex
1067}
1068
1069fn generate_plan_name(policy_name: &str, sql_hash: &str) -> String {
1076 let timestamp = format_timestamp_compact();
1077 let suffix = &sql_hash[..12.min(sql_hash.len())];
1078 let max_name_len = 253 - 4; let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
1082 let prefix = if policy_name.len() > max_prefix_len {
1083 policy_name
1084 .char_indices()
1085 .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_prefix_len)
1086 .map(|(_, ch)| ch)
1087 .collect::<String>()
1088 } else {
1089 policy_name.to_string()
1090 };
1091 format!("{prefix}-plan-{timestamp}-{suffix}")
1092}
1093
1094fn format_timestamp_compact() -> String {
1096 use std::time::SystemTime;
1097 let now = SystemTime::now()
1098 .duration_since(SystemTime::UNIX_EPOCH)
1099 .unwrap_or_default();
1100 let secs = now.as_secs();
1101 let (year, month, day) = crate::crd::days_to_date(secs / 86400);
1102 let remaining = secs % 86400;
1103 let hours = remaining / 3600;
1104 let minutes = (remaining % 3600) / 60;
1105 let seconds = remaining % 60;
1106 format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
1107}
1108
1109fn sanitize_label_value(value: &str) -> String {
1113 let sanitized: String = value
1114 .chars()
1115 .map(|c| {
1116 if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
1117 c
1118 } else {
1119 '_'
1120 }
1121 })
1122 .take(63)
1123 .collect();
1124 sanitized
1125}
1126
1127fn now_epoch_secs() -> i64 {
1129 std::time::SystemTime::now()
1130 .duration_since(std::time::UNIX_EPOCH)
1131 .unwrap_or_default()
1132 .as_secs() as i64
1133}
1134
1135fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
1138 rfc3339
1140 .parse::<jiff::Timestamp>()
1141 .ok()
1142 .map(|t| t.as_second())
1143}
1144
1145fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
1147 OwnerReference {
1148 api_version: PostgresPolicy::api_version(&()).to_string(),
1149 kind: PostgresPolicy::kind(&()).to_string(),
1150 name: policy.name_any(),
1151 uid: policy.metadata.uid.clone().unwrap_or_default(),
1152 controller: Some(true),
1153 block_owner_deletion: Some(true),
1154 }
1155}
1156
1157async fn update_plan_phase(
1162 plans_api: &Api<PostgresPolicyPlan>,
1163 plan_name: &str,
1164 phase: PlanPhase,
1165) -> Result<(), ReconcileError> {
1166 let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
1167 if phase == PlanPhase::Applying {
1168 patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
1169 }
1170 plans_api
1171 .patch_status(
1172 plan_name,
1173 &PatchParams::apply("pgroles-operator"),
1174 &Patch::Merge(&patch_value),
1175 )
1176 .await?;
1177 Ok(())
1178}
1179
1180fn set_plan_condition(
1185 conditions: &mut Vec<PolicyCondition>,
1186 condition_type: &str,
1187 status: &str,
1188 reason: &str,
1189 message: &str,
1190) {
1191 let transition_time = if let Some(existing) = conditions
1192 .iter()
1193 .find(|c| c.condition_type == condition_type)
1194 {
1195 if existing.status == status {
1196 existing.last_transition_time.clone()
1197 } else {
1198 Some(crate::crd::now_rfc3339())
1199 }
1200 } else {
1201 Some(crate::crd::now_rfc3339())
1202 };
1203
1204 let condition = PolicyCondition {
1205 condition_type: condition_type.to_string(),
1206 status: status.to_string(),
1207 reason: Some(reason.to_string()),
1208 message: Some(message.to_string()),
1209 last_transition_time: transition_time,
1210 };
1211 if let Some(existing) = conditions
1212 .iter_mut()
1213 .find(|c| c.condition_type == condition_type)
1214 {
1215 *existing = condition;
1216 } else {
1217 conditions.push(condition);
1218 }
1219}
1220
1221pub async fn update_policy_plan_ref(
1223 client: &Client,
1224 policy: &PostgresPolicy,
1225 plan_name: &str,
1226) -> Result<(), ReconcileError> {
1227 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1228 let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
1229
1230 let patch = serde_json::json!({
1231 "status": {
1232 "current_plan_ref": PlanReference {
1233 name: plan_name.to_string(),
1234 }
1235 }
1236 });
1237
1238 policy_api
1239 .patch_status(
1240 &policy.name_any(),
1241 &PatchParams::apply("pgroles-operator"),
1242 &Patch::Merge(&patch),
1243 )
1244 .await?;
1245
1246 Ok(())
1247}
1248
1249pub async fn get_current_actionable_plan(
1254 client: &Client,
1255 policy: &PostgresPolicy,
1256) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1257 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1258 let policy_name = policy.name_any();
1259
1260 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1261 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1262 let existing_plans = plans_api
1263 .list(&ListParams::default().labels(&label_selector))
1264 .await?;
1265
1266 let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
1268 .into_iter()
1269 .filter(|plan| {
1270 plan.status
1271 .as_ref()
1272 .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
1273 .unwrap_or(false)
1274 })
1275 .collect();
1276
1277 pending_plans.sort_by(|a, b| {
1278 let a_time = a.metadata.creation_timestamp.as_ref();
1279 let b_time = b.metadata.creation_timestamp.as_ref();
1280 b_time.cmp(&a_time) });
1282
1283 Ok(pending_plans.into_iter().next())
1284}
1285
1286pub async fn get_plan_by_phase(
1288 client: &Client,
1289 policy: &PostgresPolicy,
1290 target_phase: PlanPhase,
1291) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1292 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1293 let policy_name = policy.name_any();
1294
1295 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1296 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1297 let existing_plans = plans_api
1298 .list(&ListParams::default().labels(&label_selector))
1299 .await?;
1300
1301 let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
1302 .into_iter()
1303 .filter(|plan| {
1304 plan.status
1305 .as_ref()
1306 .map(|s| s.phase == target_phase)
1307 .unwrap_or(false)
1308 })
1309 .collect();
1310
1311 matching_plans.sort_by(|a, b| {
1312 let a_time = a.metadata.creation_timestamp.as_ref();
1313 let b_time = b.metadata.creation_timestamp.as_ref();
1314 b_time.cmp(&a_time) });
1316
1317 Ok(matching_plans.into_iter().next())
1318}
1319
1320pub async fn mark_plan_failed(
1322 client: &Client,
1323 plan: &PostgresPolicyPlan,
1324 error_message: &str,
1325) -> Result<(), ReconcileError> {
1326 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1327 let plan_name = plan.name_any();
1328 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1329
1330 let mut status = plan.status.clone().unwrap_or_default();
1331 status.phase = PlanPhase::Failed;
1332 status.last_error = Some(error_message.to_string());
1333 status.failed_at = Some(crate::crd::now_rfc3339());
1334
1335 let patch = serde_json::json!({ "status": status });
1336 plans_api
1337 .patch_status(
1338 &plan_name,
1339 &PatchParams::apply("pgroles-operator"),
1340 &Patch::Merge(&patch),
1341 )
1342 .await?;
1343
1344 info!(
1345 plan = %plan_name,
1346 "marked stuck Applying plan as Failed"
1347 );
1348
1349 Ok(())
1350}
1351
1352pub async fn mark_plan_approved(
1357 client: &Client,
1358 plan: &PostgresPolicyPlan,
1359 reason: &str,
1360 message: &str,
1361) -> Result<(), ReconcileError> {
1362 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1363 let plan_name = plan.name_any();
1364 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1365
1366 let mut status = plan.status.clone().unwrap_or_default();
1367 status.phase = PlanPhase::Approved;
1368 set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
1369
1370 let patch = serde_json::json!({ "status": status });
1371 plans_api
1372 .patch_status(
1373 &plan_name,
1374 &PatchParams::apply("pgroles-operator"),
1375 &Patch::Merge(&patch),
1376 )
1377 .await?;
1378
1379 Ok(())
1380}
1381
1382pub async fn mark_plan_rejected(
1384 client: &Client,
1385 plan: &PostgresPolicyPlan,
1386) -> Result<(), ReconcileError> {
1387 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1388 let plan_name = plan.name_any();
1389 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1390
1391 let mut status = plan.status.clone().unwrap_or_default();
1392 status.phase = PlanPhase::Rejected;
1393 set_plan_condition(
1394 &mut status.conditions,
1395 "Approved",
1396 "False",
1397 "Rejected",
1398 "Plan rejected via annotation",
1399 );
1400
1401 let patch = serde_json::json!({ "status": status });
1402 plans_api
1403 .patch_status(
1404 &plan_name,
1405 &PatchParams::apply("pgroles-operator"),
1406 &Patch::Merge(&patch),
1407 )
1408 .await?;
1409
1410 Ok(())
1411}
1412
1413pub async fn mark_plan_superseded(
1415 client: &Client,
1416 plan: &PostgresPolicyPlan,
1417) -> Result<(), ReconcileError> {
1418 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1419 let plan_name = plan.name_any();
1420 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1421
1422 let mut status = plan.status.clone().unwrap_or_default();
1423 status.phase = PlanPhase::Superseded;
1424 set_plan_condition(
1425 &mut status.conditions,
1426 "Approved",
1427 "False",
1428 "Superseded",
1429 "Database state changed since plan was approved",
1430 );
1431
1432 let patch = serde_json::json!({ "status": status });
1433 plans_api
1434 .patch_status(
1435 &plan_name,
1436 &PatchParams::apply("pgroles-operator"),
1437 &Patch::Merge(&patch),
1438 )
1439 .await?;
1440
1441 Ok(())
1442}
1443
1444#[cfg(test)]
1449mod tests {
1450 use super::*;
1451 use crate::crd::CrdReconciliationMode;
1452 use base64::Engine as _;
1453 use flate2::read::GzDecoder;
1454 use std::io::Read;
1455
1456 fn test_plan(
1457 name: &str,
1458 phase: PlanPhase,
1459 annotations: Option<BTreeMap<String, String>>,
1460 ) -> PostgresPolicyPlan {
1461 let mut plan = PostgresPolicyPlan::new(
1462 name,
1463 PostgresPolicyPlanSpec {
1464 policy_ref: PolicyPlanRef {
1465 name: "test-policy".to_string(),
1466 },
1467 policy_generation: 1,
1468 reconciliation_mode: CrdReconciliationMode::Authoritative,
1469 owned_roles: vec!["role-a".to_string()],
1470 owned_schemas: vec!["public".to_string()],
1471 managed_database_identity: "default/db/DATABASE_URL".to_string(),
1472 },
1473 );
1474 plan.metadata.namespace = Some("default".to_string());
1475 plan.metadata.annotations = annotations;
1476 plan.status = Some(PostgresPolicyPlanStatus {
1477 phase,
1478 ..Default::default()
1479 });
1480 plan
1481 }
1482
1483 #[test]
1484 fn check_plan_approval_pending_when_no_annotations() {
1485 let plan = test_plan("plan-1", PlanPhase::Pending, None);
1486 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1487 }
1488
1489 #[test]
1490 fn check_plan_approval_approved_with_annotation() {
1491 let annotations =
1492 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1493 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1494 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1495 }
1496
1497 #[test]
1498 fn check_plan_approval_rejected_with_annotation() {
1499 let annotations =
1500 BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1501 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1502 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1503 }
1504
1505 #[test]
1506 fn check_plan_approval_rejected_wins_over_approved() {
1507 let annotations = BTreeMap::from([
1508 (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1509 (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1510 ]);
1511 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1512 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1513 }
1514
1515 #[test]
1516 fn check_plan_approval_non_true_value_is_pending() {
1517 let annotations =
1518 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1519 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1520 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1521 }
1522
1523 #[test]
1524 fn compute_sql_hash_is_deterministic() {
1525 let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1526 let hash1 = compute_sql_hash(sql);
1527 let hash2 = compute_sql_hash(sql);
1528 assert_eq!(hash1, hash2);
1529 assert_eq!(hash1.len(), 64); }
1531
1532 #[test]
1533 fn compute_sql_hash_differs_for_different_sql() {
1534 let hash1 = compute_sql_hash("CREATE ROLE a;");
1535 let hash2 = compute_sql_hash("CREATE ROLE b;");
1536 assert_ne!(hash1, hash2);
1537 }
1538
1539 #[test]
1540 fn compute_sql_hash_matches_pinned_fixture() {
1541 assert_eq!(
1542 compute_sql_hash("CREATE ROLE app LOGIN;"),
1543 "12a9743285d98ce73cfa9c840e943fc627d1fcbce22c5206fda1b21c84c1ac9c"
1544 );
1545 }
1546
1547 #[test]
1548 fn generate_plan_name_has_expected_format() {
1549 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1550 let name = generate_plan_name("my-policy", hash);
1551 assert!(name.starts_with("my-policy-plan-"));
1552 assert!(name.ends_with("-abcdef012345"));
1553 let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1554 assert_eq!(suffix.len(), 28);
1556 assert_eq!(&suffix[8..9], "-");
1557 assert_eq!(&suffix[15..16], "-");
1558 }
1559
1560 #[test]
1561 fn generate_plan_name_is_idempotent_for_same_hash_in_same_second() {
1562 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1563 let name1 = generate_plan_name("my-policy", hash);
1564 let name2 = generate_plan_name("my-policy", hash);
1565 assert_eq!(name1, name2);
1566 }
1567
1568 #[test]
1569 fn generate_plan_name_truncates_on_utf8_boundary() {
1570 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1571 let name = generate_plan_name(&"é".repeat(140), hash);
1572 assert!(name.len() <= 249);
1573 assert!(name.ends_with("-abcdef012345"));
1574 }
1575
1576 #[test]
1577 fn plan_label_value_is_stable_and_label_safe_for_long_names() {
1578 let plan_name = "very-long-policy-name-".repeat(20);
1579 let label = plan_label_value(&plan_name);
1580 assert_eq!(label, plan_label_value(&plan_name));
1581 assert_eq!(label.len(), 32);
1582 assert!(label.chars().all(|ch| ch.is_ascii_hexdigit()));
1583 }
1584
1585 #[test]
1586 fn existing_non_pending_plan_status_is_not_repatched_on_create_conflict() {
1587 let approved = test_plan("plan-1", PlanPhase::Approved, None);
1588 let applying = test_plan("plan-1", PlanPhase::Applying, None);
1589 let applied = test_plan("plan-1", PlanPhase::Applied, None);
1590
1591 assert!(!should_patch_existing_plan_status(&approved));
1592 assert!(!should_patch_existing_plan_status(&applying));
1593 assert!(!should_patch_existing_plan_status(&applied));
1594 }
1595
1596 #[test]
1597 fn existing_pending_or_statusless_plan_can_be_patched_on_create_conflict() {
1598 let pending = test_plan("plan-1", PlanPhase::Pending, None);
1599 let mut statusless = pending.clone();
1600 statusless.status = None;
1601
1602 assert!(should_patch_existing_plan_status(&pending));
1603 assert!(should_patch_existing_plan_status(&statusless));
1604 }
1605
1606 #[test]
1607 fn prepare_plan_sql_keeps_small_sql_inline() {
1608 let prepared = prepare_plan_sql("plan-1", "CREATE ROLE app LOGIN;").unwrap();
1609
1610 assert!(matches!(prepared.artifact, PlanSqlArtifact::Inline(_)));
1611 assert_eq!(
1612 prepared.sql_inline(),
1613 Some("CREATE ROLE app LOGIN;".to_string())
1614 );
1615 assert!(prepared.sql_ref().is_none());
1616 assert!(!prepared.is_truncated());
1617 }
1618
1619 #[test]
1620 fn prepare_plan_sql_compresses_large_brownfield_sized_sql() {
1621 let sql = brownfield_sized_sql();
1622 assert!(sql.len() > 1_048_576);
1623
1624 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1625
1626 let PlanSqlArtifact::CompressedConfigMap {
1627 key,
1628 compressed_sql,
1629 ..
1630 } = &prepared.artifact
1631 else {
1632 panic!("expected compressed ConfigMap artifact");
1633 };
1634 assert_eq!(key, SQL_CONFIGMAP_GZIP_KEY);
1635 assert!(compressed_sql.len() < MAX_CONFIGMAP_SQL_BYTES);
1636 assert_eq!(gunzip(compressed_sql), sql);
1637 assert_eq!(
1638 prepared.sql_ref().unwrap().compression,
1639 Some(SqlCompression::Gzip)
1640 );
1641 assert_eq!(prepared.original_bytes, sql.len());
1642 assert_eq!(prepared.stored_bytes, compressed_sql.len());
1643 }
1644
1645 #[test]
1646 fn configmap_binary_data_serializes_with_one_base64_layer() {
1647 let sql = brownfield_sized_sql();
1648 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1649 let PlanSqlArtifact::CompressedConfigMap {
1650 key,
1651 compressed_sql,
1652 ..
1653 } = &prepared.artifact
1654 else {
1655 panic!("expected compressed ConfigMap artifact");
1656 };
1657 let configmap = ConfigMap {
1658 binary_data: Some(BTreeMap::from([(
1659 key.clone(),
1660 ByteString(compressed_sql.clone()),
1661 )])),
1662 ..Default::default()
1663 };
1664
1665 let encoded = serde_json::to_value(&configmap).unwrap()["binaryData"][key]
1666 .as_str()
1667 .unwrap()
1668 .to_string();
1669 let decoded = base64::engine::general_purpose::STANDARD
1670 .decode(encoded)
1671 .unwrap();
1672
1673 assert_eq!(decoded, *compressed_sql);
1674 assert_eq!(gunzip(&decoded), sql);
1675 }
1676
1677 #[test]
1678 fn prepare_plan_sql_truncates_when_compressed_sql_is_still_too_large() {
1679 let sql = deterministic_incompressible_sql(1_400_000);
1680 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1681
1682 let PlanSqlArtifact::TruncatedInline(preview) = &prepared.artifact else {
1683 panic!("expected truncated inline artifact");
1684 };
1685 assert!(preview.len() <= MAX_INLINE_SQL_BYTES);
1686 assert!(preview.contains("truncated"));
1687 assert!(prepared.sql_ref().is_none());
1688 assert!(prepared.is_truncated());
1689 }
1690
1691 #[test]
1692 fn sanitize_label_value_replaces_slashes() {
1693 let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1694 assert!(!sanitized.contains('/'));
1695 assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1696 }
1697
1698 #[test]
1699 fn sanitize_label_value_truncates_to_63_chars() {
1700 let long_value = "a".repeat(100);
1701 let sanitized = sanitize_label_value(&long_value);
1702 assert!(sanitized.len() <= 63);
1703 }
1704
1705 #[test]
1706 fn stale_policy_sql_configmap_without_plan_label_is_orphan() {
1707 let configmap = ConfigMap {
1708 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1709 labels: Some(BTreeMap::from([(
1710 LABEL_POLICY.to_string(),
1711 sanitize_label_value("test-policy"),
1712 )])),
1713 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1714 jiff::Timestamp::from_second(0).unwrap(),
1715 )),
1716 ..Default::default()
1717 },
1718 ..Default::default()
1719 };
1720
1721 assert!(is_orphan_sql_configmap(
1722 &configmap,
1723 &BTreeSet::new(),
1724 &BTreeSet::new(),
1725 ORPHAN_GRACE_SECS + 1
1726 ));
1727 }
1728
1729 #[test]
1730 fn stale_policy_sql_configmap_with_current_plan_name_is_not_orphan() {
1731 let plan_name = "test-policy-plan-20260506-000000-abcdef012345";
1732 let configmap = ConfigMap {
1733 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1734 name: Some(format!("{plan_name}-sql")),
1735 labels: Some(BTreeMap::from([
1736 (
1737 LABEL_POLICY.to_string(),
1738 sanitize_label_value("test-policy"),
1739 ),
1740 (
1741 LABEL_PLAN.to_string(),
1742 sanitize_label_value("legacy-colliding-label"),
1743 ),
1744 ])),
1745 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1746 jiff::Timestamp::from_second(0).unwrap(),
1747 )),
1748 ..Default::default()
1749 },
1750 ..Default::default()
1751 };
1752
1753 assert!(!is_orphan_sql_configmap(
1754 &configmap,
1755 &BTreeSet::from([plan_name.to_string()]),
1756 &BTreeSet::new(),
1757 ORPHAN_GRACE_SECS + 1
1758 ));
1759 }
1760
1761 #[test]
1762 fn stale_policy_sql_configmap_with_known_hash_plan_label_is_not_orphan() {
1763 let plan_name = "test-policy-plan-20260506-000000-abcdef012345";
1764 let plan_label = plan_label_value(plan_name);
1765 let configmap = ConfigMap {
1766 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1767 name: Some("different-plan-sql".to_string()),
1768 labels: Some(BTreeMap::from([
1769 (
1770 LABEL_POLICY.to_string(),
1771 sanitize_label_value("test-policy"),
1772 ),
1773 (LABEL_PLAN.to_string(), plan_label.clone()),
1774 ])),
1775 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1776 jiff::Timestamp::from_second(0).unwrap(),
1777 )),
1778 ..Default::default()
1779 },
1780 ..Default::default()
1781 };
1782
1783 assert!(!is_orphan_sql_configmap(
1784 &configmap,
1785 &BTreeSet::new(),
1786 &BTreeSet::from([plan_label]),
1787 ORPHAN_GRACE_SECS + 1
1788 ));
1789 }
1790
1791 #[test]
1792 fn stale_policy_sql_configmap_with_only_legacy_colliding_label_is_orphan() {
1793 let plan_name =
1794 "very-long-policy-name-that-would-have-collided-plan-20260506-000000-abcdef012345";
1795 let legacy_label = sanitize_label_value(plan_name);
1796 let configmap = ConfigMap {
1797 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1798 name: Some("deleted-historical-plan-sql".to_string()),
1799 labels: Some(BTreeMap::from([
1800 (
1801 LABEL_POLICY.to_string(),
1802 sanitize_label_value("test-policy"),
1803 ),
1804 (LABEL_PLAN.to_string(), legacy_label.clone()),
1805 ])),
1806 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1807 jiff::Timestamp::from_second(0).unwrap(),
1808 )),
1809 ..Default::default()
1810 },
1811 ..Default::default()
1812 };
1813
1814 assert!(is_orphan_sql_configmap(
1815 &configmap,
1816 &BTreeSet::new(),
1817 &BTreeSet::new(),
1818 ORPHAN_GRACE_SECS + 1
1819 ));
1820 }
1821
1822 #[test]
1823 fn render_redacted_sql_masks_passwords() {
1824 let changes = vec![
1825 pgroles_core::diff::Change::CreateRole {
1826 name: "app".to_string(),
1827 state: pgroles_core::model::RoleState {
1828 login: true,
1829 ..pgroles_core::model::RoleState::default()
1830 },
1831 },
1832 pgroles_core::diff::Change::SetPassword {
1833 name: "app".to_string(),
1834 password: "super_secret".to_string(),
1835 },
1836 ];
1837 let ctx = pgroles_core::sql::SqlContext::default();
1838 let redacted = render_redacted_sql(&changes, &ctx);
1839
1840 assert!(redacted.contains("[REDACTED]"));
1841 assert!(!redacted.contains("super_secret"));
1842 assert!(redacted.contains("CREATE ROLE"));
1843 }
1844
1845 #[test]
1846 fn render_full_sql_includes_passwords() {
1847 let changes = vec![pgroles_core::diff::Change::SetPassword {
1848 name: "app".to_string(),
1849 password: "super_secret".to_string(),
1850 }];
1851 let ctx = pgroles_core::sql::SqlContext::default();
1852 let full = render_full_sql(&changes, &ctx);
1853
1854 assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1855 }
1856
1857 #[test]
1858 fn now_epoch_secs_returns_plausible_value() {
1859 let now = now_epoch_secs();
1860 let y2025 = 1_735_689_600_i64;
1862 let y2100 = 4_102_444_800_i64;
1863 assert!(
1864 now > y2025 && now < y2100,
1865 "epoch secs {now} should be between 2025 and 2100"
1866 );
1867 }
1868
1869 fn brownfield_sized_sql() -> String {
1870 let mut sql = String::new();
1871 for schema in 0..33 {
1872 for profile in ["reader", "writer", "owner", "cdc"] {
1873 let role = format!("schema_{schema}_{profile}");
1874 sql.push_str(&format!(
1875 "CREATE ROLE \"{role}\" LOGIN;\nCOMMENT ON ROLE \"{role}\" IS 'Generated from profile {profile} for brownfield migration schema {schema} with cdc ownership directives and review metadata';\n"
1876 ));
1877 for relkind in ["TABLES", "SEQUENCES", "FUNCTIONS"] {
1878 sql.push_str(&format!(
1879 "GRANT SELECT ON ALL {relkind} IN SCHEMA \"schema_{schema}\" TO \"{role}\";\n"
1880 ));
1881 }
1882 for owner in 0..20 {
1883 sql.push_str(&format!(
1884 "ALTER DEFAULT PRIVILEGES FOR ROLE \"owner_{owner}\" IN SCHEMA \"schema_{schema}\" GRANT SELECT ON TABLES TO \"{role}\";\n"
1885 ));
1886 }
1887 }
1888 }
1889 for member in 0..70 {
1890 sql.push_str(&format!(
1891 "GRANT \"group_{member}\" TO \"service_login_{}\";\n",
1892 member % 20
1893 ));
1894 }
1895 while sql.len() <= 1_100_000 {
1896 sql.push_str("-- brownfield migration padding for large plan regression\n");
1897 }
1898 sql
1899 }
1900
1901 fn deterministic_incompressible_sql(target_bytes: usize) -> String {
1902 let mut state = 0x1234_5678_u64;
1903 let mut sql = String::with_capacity(target_bytes);
1904 while sql.len() < target_bytes {
1905 state ^= state << 13;
1906 state ^= state >> 7;
1907 state ^= state << 17;
1908 let value = (state % 62) as u8;
1909 let ch = match value {
1910 0..=9 => b'0' + value,
1911 10..=35 => b'a' + (value - 10),
1912 _ => b'A' + (value - 36),
1913 };
1914 sql.push(ch as char);
1915 if sql.len().is_multiple_of(120) {
1916 sql.push('\n');
1917 }
1918 }
1919 sql
1920 }
1921
1922 fn gunzip(bytes: &[u8]) -> String {
1923 let mut decoder = GzDecoder::new(bytes);
1924 let mut decoded = String::new();
1925 decoder.read_to_string(&mut decoded).unwrap();
1926 decoded
1927 }
1928}