1use std::collections::{BTreeMap, BTreeSet};
8use std::io::Write;
9use std::time::Duration;
10
11use flate2::Compression;
12use flate2::write::GzEncoder;
13use k8s_openapi::ByteString;
14use k8s_openapi::api::core::v1::ConfigMap;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
16use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
17use kube::{Client, Resource, ResourceExt};
18use sha2::{Digest, Sha256};
19use tracing::info;
20
21use crate::crd::{
22 ChangeSummary, CrdReconciliationMode, LABEL_DATABASE_IDENTITY, LABEL_PLAN, LABEL_POLICY,
23 PLAN_APPROVED_ANNOTATION, PLAN_REJECTED_ANNOTATION, PlanPhase, PlanReference, PolicyCondition,
24 PolicyPlanRef, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyPlanSpec,
25 PostgresPolicyPlanStatus, SqlCompression, SqlRef,
26};
27use crate::reconciler::ReconcileError;
28
29#[derive(Debug, Clone)]
32pub enum PlanCreationResult {
33 Created(String),
35 Deduplicated(String),
37}
38
39impl PlanCreationResult {
40 pub fn plan_name(&self) -> &str {
42 match self {
43 PlanCreationResult::Created(name) | PlanCreationResult::Deduplicated(name) => name,
44 }
45 }
46
47 pub fn is_created(&self) -> bool {
49 matches!(self, PlanCreationResult::Created(_))
50 }
51}
52
53const MAX_INLINE_SQL_BYTES: usize = 16 * 1024;
55
56const SQL_CONFIGMAP_GZIP_KEY: &str = "plan.sql.gz";
58
59const MAX_CONFIGMAP_SQL_BYTES: usize = 900 * 1024;
62
63const ORPHAN_GRACE_SECS: i64 = 60;
65
66const CLEANUP_TIMEOUT_SECS: u64 = 5;
68
69const DEFAULT_MAX_PLANS: usize = 10;
71
72const FAILED_PLAN_DEDUP_WINDOW_SECS: i64 = 120;
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78enum PlanSqlArtifact {
79 Inline(String),
80 CompressedConfigMap {
81 configmap_name: String,
82 key: String,
83 compressed_sql: Vec<u8>,
84 },
85 TruncatedInline(String),
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89struct PreparedPlanSql {
90 artifact: PlanSqlArtifact,
91 redacted_sql_hash: String,
92 original_bytes: usize,
93 stored_bytes: usize,
94}
95
96impl PreparedPlanSql {
97 fn sql_ref(&self) -> Option<SqlRef> {
98 match &self.artifact {
99 PlanSqlArtifact::CompressedConfigMap {
100 configmap_name,
101 key,
102 ..
103 } => Some(SqlRef {
104 name: configmap_name.clone(),
105 key: key.clone(),
106 compression: Some(SqlCompression::Gzip),
107 }),
108 PlanSqlArtifact::Inline(_) | PlanSqlArtifact::TruncatedInline(_) => None,
109 }
110 }
111
112 fn sql_inline(&self) -> Option<String> {
113 match &self.artifact {
114 PlanSqlArtifact::Inline(sql) | PlanSqlArtifact::TruncatedInline(sql) => {
115 Some(sql.clone())
116 }
117 PlanSqlArtifact::CompressedConfigMap { .. } => None,
118 }
119 }
120
121 fn is_truncated(&self) -> bool {
122 matches!(self.artifact, PlanSqlArtifact::TruncatedInline(_))
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
132pub enum PlanApprovalState {
133 Pending,
134 Approved,
135 Rejected,
136}
137
138pub fn check_plan_approval(plan: &PostgresPolicyPlan) -> PlanApprovalState {
143 let annotations = plan.metadata.annotations.as_ref();
144
145 let rejected = annotations
146 .and_then(|a| a.get(PLAN_REJECTED_ANNOTATION))
147 .map(|v| v == "true")
148 .unwrap_or(false);
149
150 if rejected {
151 return PlanApprovalState::Rejected;
152 }
153
154 let approved = annotations
155 .and_then(|a| a.get(PLAN_APPROVED_ANNOTATION))
156 .map(|v| v == "true")
157 .unwrap_or(false);
158
159 if approved {
160 return PlanApprovalState::Approved;
161 }
162
163 PlanApprovalState::Pending
164}
165
166#[allow(clippy::too_many_arguments)]
183pub async fn create_or_update_plan(
184 client: &Client,
185 policy: &PostgresPolicy,
186 changes: &[pgroles_core::diff::Change],
187 sql_context: &pgroles_core::sql::SqlContext,
188 inspect_config: &pgroles_inspect::InspectConfig,
189 reconciliation_mode: CrdReconciliationMode,
190 database_identity: &str,
191 change_summary: &ChangeSummary,
192) -> Result<PlanCreationResult, ReconcileError> {
193 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
194 let policy_name = policy.name_any();
195 let generation = policy.metadata.generation.unwrap_or(0);
196
197 let full_sql = render_full_sql(changes, sql_context);
199
200 let sql_hash = compute_sql_hash(&full_sql);
202
203 let sql_statement_count = full_sql.lines().filter(|l| !l.trim().is_empty()).count() as i64;
205
206 let redacted_sql = render_redacted_sql(changes, sql_context);
208
209 cleanup_old_plans_best_effort(client, policy, None).await;
210
211 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
212
213 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
215 let existing_plans = plans_api
216 .list(&ListParams::default().labels(&label_selector))
217 .await?;
218
219 for plan in &existing_plans {
221 if let Some(ref status) = plan.status
222 && status.phase == PlanPhase::Pending
223 && status.sql_hash.as_deref() == Some(&sql_hash)
224 {
225 let plan_name = plan.name_any();
227 info!(
228 plan = %plan_name,
229 policy = %policy_name,
230 "existing pending plan has identical SQL hash, skipping creation"
231 );
232 return Ok(PlanCreationResult::Deduplicated(plan_name));
233 }
234 }
235
236 let now_ts = now_epoch_secs();
244 for plan in &existing_plans {
245 if let Some(ref status) = plan.status
246 && status.phase == PlanPhase::Failed
247 && status.sql_hash.as_deref() == Some(&sql_hash)
248 {
249 let failed_ts = status
250 .failed_at
251 .as_deref()
252 .and_then(parse_rfc3339_epoch_secs)
253 .unwrap_or(0);
254 if failed_ts > 0 && now_ts - failed_ts < FAILED_PLAN_DEDUP_WINDOW_SECS {
255 let plan_name = plan.name_any();
256 info!(
257 plan = %plan_name,
258 policy = %policy_name,
259 age_secs = now_ts - failed_ts,
260 "recently-failed plan has identical SQL hash, skipping creation"
261 );
262 return Ok(PlanCreationResult::Deduplicated(plan_name));
263 }
264 }
265 }
266
267 let plan_name = generate_plan_name(&policy_name, &sql_hash);
270 let prepared_sql = prepare_plan_sql(&plan_name, &redacted_sql)?;
271
272 let sql_configmap_name = create_plan_sql_configmap(
274 client,
275 policy,
276 &namespace,
277 &policy_name,
278 database_identity,
279 &prepared_sql,
280 )
281 .await?;
282
283 let owner_ref = build_owner_reference(policy);
285
286 let plan = PostgresPolicyPlan::new(
288 &plan_name,
289 PostgresPolicyPlanSpec {
290 policy_ref: PolicyPlanRef {
291 name: policy_name.clone(),
292 },
293 policy_generation: generation,
294 reconciliation_mode,
295 owned_roles: inspect_config.managed_roles.clone(),
296 owned_schemas: inspect_config.managed_schemas.clone(),
297 managed_database_identity: database_identity.to_string(),
298 },
299 );
300 let mut plan = plan;
301 plan.metadata.namespace = Some(namespace.clone());
302 plan.metadata.owner_references = Some(vec![owner_ref.clone()]);
303 plan.metadata.labels = Some(BTreeMap::from([
304 (LABEL_POLICY.to_string(), sanitize_label_value(&policy_name)),
305 (
306 LABEL_DATABASE_IDENTITY.to_string(),
307 sanitize_label_value(database_identity),
308 ),
309 ]));
310
311 let sql_preview = redacted_sql.lines().take(5).collect::<Vec<_>>().join("\n");
313 let summary_text = format!(
314 "{}R {}G {}D {}DP {}M",
315 change_summary.roles_created + change_summary.roles_altered,
316 change_summary.grants_added,
317 change_summary.default_privileges_set,
318 change_summary.roles_dropped,
319 change_summary.members_added,
320 );
321 plan.metadata.annotations = Some(BTreeMap::from([
322 ("pgroles.io/sql-preview".to_string(), sql_preview),
323 ("pgroles.io/summary".to_string(), summary_text),
324 (
325 "pgroles.io/sql-hash".to_string(),
326 sql_hash[..12].to_string(),
327 ),
328 (
329 "pgroles.io/redacted-sql-hash".to_string(),
330 prepared_sql.redacted_sql_hash[..12].to_string(),
331 ),
332 (
333 "pgroles.io/sql-original-bytes".to_string(),
334 prepared_sql.original_bytes.to_string(),
335 ),
336 (
337 "pgroles.io/sql-stored-bytes".to_string(),
338 prepared_sql.stored_bytes.to_string(),
339 ),
340 ]));
341
342 let (created_plan, created_new_plan) =
343 match plans_api.create(&PostParams::default(), &plan).await {
344 Ok(plan) => (plan, true),
345 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
346 let existing = plans_api.get(&plan_name).await?;
347 (existing, false)
348 }
349 Err(err) => {
350 if let Some(configmap_name) = sql_configmap_name.as_deref() {
351 delete_configmap_best_effort(client, &namespace, configmap_name).await;
352 }
353 return Err(err.into());
354 }
355 };
356 let plan_name = created_plan.name_any();
357
358 let computed_message = if prepared_sql.is_truncated() {
360 format!(
361 "Plan computed with {} change(s); SQL preview truncated because compressed SQL exceeded Kubernetes ConfigMap limits",
362 change_summary.total
363 )
364 } else {
365 format!("Plan computed with {} change(s)", change_summary.total)
366 };
367 let plan_status = PostgresPolicyPlanStatus {
368 phase: PlanPhase::Pending,
369 conditions: vec![
370 PolicyCondition {
371 condition_type: "Computed".to_string(),
372 status: "True".to_string(),
373 reason: Some("PlanComputed".to_string()),
374 message: Some(computed_message),
375 last_transition_time: Some(crate::crd::now_rfc3339()),
376 },
377 PolicyCondition {
378 condition_type: "Approved".to_string(),
379 status: "False".to_string(),
380 reason: Some("PendingApproval".to_string()),
381 message: Some("Plan awaiting approval".to_string()),
382 last_transition_time: Some(crate::crd::now_rfc3339()),
383 },
384 ],
385 change_summary: Some(change_summary.clone()),
386 sql_ref: prepared_sql.sql_ref(),
387 sql_inline: prepared_sql.sql_inline(),
388 sql_truncated: prepared_sql.is_truncated(),
389 computed_at: Some(crate::crd::now_rfc3339()),
390 applied_at: None,
391 last_error: None,
392 sql_hash: Some(sql_hash),
393 applying_since: None,
394 failed_at: None,
395 sql_statements: Some(sql_statement_count),
396 redacted_sql_hash: Some(prepared_sql.redacted_sql_hash.clone()),
397 sql_original_bytes: Some(prepared_sql.original_bytes as i64),
398 sql_stored_bytes: Some(prepared_sql.stored_bytes as i64),
399 };
400
401 let status_patch = serde_json::json!({ "status": plan_status });
402 if let Err(err) = plans_api
403 .patch_status(
404 &plan_name,
405 &PatchParams::apply("pgroles-operator"),
406 &Patch::Merge(&status_patch),
407 )
408 .await
409 {
410 if created_new_plan {
411 delete_plan_best_effort(&plans_api, &plan_name).await;
412 }
413 if let Some(configmap_name) = sql_configmap_name.as_deref() {
414 delete_configmap_best_effort(client, &namespace, configmap_name).await;
415 }
416 return Err(err.into());
417 }
418
419 for plan in &existing_plans {
423 if let Some(ref status) = plan.status
424 && status.phase == PlanPhase::Pending
425 && plan.name_any() != plan_name
426 {
427 let old_plan_name = plan.name_any();
428 info!(
429 plan = %old_plan_name,
430 policy = %policy_name,
431 "marking existing pending plan as Superseded"
432 );
433 let superseded_status = PostgresPolicyPlanStatus {
434 phase: PlanPhase::Superseded,
435 ..status.clone()
436 };
437 let patch = serde_json::json!({ "status": superseded_status });
438 plans_api
439 .patch_status(
440 &old_plan_name,
441 &PatchParams::apply("pgroles-operator"),
442 &Patch::Merge(&patch),
443 )
444 .await?;
445 }
446 }
447
448 info!(
449 plan = %plan_name,
450 policy = %policy_name,
451 changes = change_summary.total,
452 "created new plan"
453 );
454
455 Ok(PlanCreationResult::Created(plan_name))
456}
457
458pub async fn execute_plan(
469 client: &Client,
470 plan: &PostgresPolicyPlan,
471 pool: &sqlx::PgPool,
472 sql_context: &pgroles_core::sql::SqlContext,
473 changes: &[pgroles_core::diff::Change],
474) -> Result<(), ReconcileError> {
475 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
476 let plan_name = plan.name_any();
477 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
478
479 update_plan_phase(&plans_api, &plan_name, PlanPhase::Applying).await?;
481
482 let result = execute_changes_in_transaction(pool, changes, sql_context).await;
486
487 match result {
488 Ok(statements_executed) => {
489 let mut applied_status = plan.status.clone().unwrap_or_default();
491 applied_status.phase = PlanPhase::Applied;
492 applied_status.applied_at = Some(crate::crd::now_rfc3339());
493 applied_status.last_error = None;
494 set_plan_condition(
495 &mut applied_status.conditions,
496 "Approved",
497 "True",
498 "Approved",
499 "Plan approved and executed",
500 );
501
502 let patch = serde_json::json!({ "status": applied_status });
503 plans_api
504 .patch_status(
505 &plan_name,
506 &PatchParams::apply("pgroles-operator"),
507 &Patch::Merge(&patch),
508 )
509 .await?;
510
511 info!(
512 plan = %plan_name,
513 statements = statements_executed,
514 "plan executed successfully"
515 );
516 Ok(())
517 }
518 Err(err) => {
519 let error_message = err.to_string();
521 let mut failed_status = plan.status.clone().unwrap_or_default();
522 failed_status.phase = PlanPhase::Failed;
523 failed_status.last_error = Some(error_message);
524 failed_status.failed_at = Some(crate::crd::now_rfc3339());
525
526 let patch = serde_json::json!({ "status": failed_status });
527 if let Err(status_err) = plans_api
528 .patch_status(
529 &plan_name,
530 &PatchParams::apply("pgroles-operator"),
531 &Patch::Merge(&patch),
532 )
533 .await
534 {
535 tracing::warn!(
536 plan = %plan_name,
537 %status_err,
538 "failed to update plan status to Failed"
539 );
540 }
541
542 Err(err)
543 }
544 }
545}
546
547fn prepare_plan_sql(
548 plan_name: &str,
549 redacted_sql: &str,
550) -> Result<PreparedPlanSql, ReconcileError> {
551 let original_bytes = redacted_sql.len();
552 let redacted_sql_hash = compute_sql_hash(redacted_sql);
553
554 if original_bytes <= MAX_INLINE_SQL_BYTES {
555 return Ok(PreparedPlanSql {
556 artifact: PlanSqlArtifact::Inline(redacted_sql.to_string()),
557 redacted_sql_hash,
558 original_bytes,
559 stored_bytes: original_bytes,
560 });
561 }
562
563 let compressed_sql = gzip_bytes(redacted_sql.as_bytes())?;
564 if compressed_sql.len() <= MAX_CONFIGMAP_SQL_BYTES {
565 let stored_bytes = compressed_sql.len();
566 return Ok(PreparedPlanSql {
567 artifact: PlanSqlArtifact::CompressedConfigMap {
568 configmap_name: format!("{plan_name}-sql"),
569 key: SQL_CONFIGMAP_GZIP_KEY.to_string(),
570 compressed_sql,
571 },
572 redacted_sql_hash,
573 original_bytes,
574 stored_bytes,
575 });
576 }
577
578 let truncated = truncate_utf8(
579 redacted_sql,
580 MAX_INLINE_SQL_BYTES,
581 "\n-- truncated: compressed SQL preview exceeded Kubernetes ConfigMap limits --",
582 );
583 let stored_bytes = truncated.len();
584 Ok(PreparedPlanSql {
585 artifact: PlanSqlArtifact::TruncatedInline(truncated),
586 redacted_sql_hash,
587 original_bytes,
588 stored_bytes,
589 })
590}
591
592fn gzip_bytes(bytes: &[u8]) -> Result<Vec<u8>, ReconcileError> {
593 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
594 encoder
595 .write_all(bytes)
596 .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))?;
597 encoder
598 .finish()
599 .map_err(|err| ReconcileError::PlanSqlStorage(err.to_string()))
600}
601
602fn truncate_utf8(text: &str, max_bytes: usize, marker: &str) -> String {
603 if text.len() <= max_bytes {
604 return text.to_string();
605 }
606
607 let target_len = max_bytes.saturating_sub(marker.len());
608 let mut end = target_len.min(text.len());
609 while end > 0 && !text.is_char_boundary(end) {
610 end -= 1;
611 }
612
613 let mut truncated = text[..end].to_string();
614 truncated.push_str(marker);
615 truncated
616}
617
618async fn create_plan_sql_configmap(
619 client: &Client,
620 policy: &PostgresPolicy,
621 namespace: &str,
622 policy_name: &str,
623 database_identity: &str,
624 prepared_sql: &PreparedPlanSql,
625) -> Result<Option<String>, ReconcileError> {
626 let PlanSqlArtifact::CompressedConfigMap {
627 configmap_name,
628 key: _,
629 compressed_sql: _,
630 } = &prepared_sql.artifact
631 else {
632 return Ok(None);
633 };
634
635 let configmap = build_plan_sql_configmap_object(
636 policy,
637 namespace,
638 policy_name,
639 database_identity,
640 prepared_sql,
641 )?;
642
643 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
644 match configmaps_api
645 .create(&PostParams::default(), &configmap)
646 .await
647 {
648 Ok(_) => Ok(Some(configmap_name.clone())),
649 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
650 let existing = configmaps_api.get(configmap_name).await?;
651 validate_existing_sql_configmap(&existing, prepared_sql)?;
652 Ok(Some(configmap_name.clone()))
653 }
654 Err(err) => Err(err.into()),
655 }
656}
657
658fn build_plan_sql_configmap_object(
659 policy: &PostgresPolicy,
660 namespace: &str,
661 policy_name: &str,
662 database_identity: &str,
663 prepared_sql: &PreparedPlanSql,
664) -> Result<ConfigMap, ReconcileError> {
665 let PlanSqlArtifact::CompressedConfigMap {
666 configmap_name,
667 key,
668 compressed_sql,
669 } = &prepared_sql.artifact
670 else {
671 return Err(ReconcileError::PlanSqlStorage(
672 "cannot build ConfigMap for inline plan SQL".to_string(),
673 ));
674 };
675
676 Ok(ConfigMap {
677 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
678 name: Some(configmap_name.clone()),
679 namespace: Some(namespace.to_string()),
680 owner_references: Some(vec![build_owner_reference(policy)]),
681 labels: Some(BTreeMap::from([
682 (LABEL_POLICY.to_string(), sanitize_label_value(policy_name)),
683 (
684 LABEL_DATABASE_IDENTITY.to_string(),
685 sanitize_label_value(database_identity),
686 ),
687 (
688 LABEL_PLAN.to_string(),
689 sanitize_label_value(configmap_plan_name(configmap_name)),
690 ),
691 ])),
692 annotations: Some(BTreeMap::from([
693 ("pgroles.io/sql-compression".to_string(), "gzip".to_string()),
694 (
695 "pgroles.io/redacted-sql-hash".to_string(),
696 prepared_sql.redacted_sql_hash.clone(),
697 ),
698 (
699 "pgroles.io/sql-original-bytes".to_string(),
700 prepared_sql.original_bytes.to_string(),
701 ),
702 (
703 "pgroles.io/sql-stored-bytes".to_string(),
704 prepared_sql.stored_bytes.to_string(),
705 ),
706 ])),
707 ..Default::default()
708 },
709 binary_data: Some(BTreeMap::from([(
710 key.clone(),
711 ByteString(compressed_sql.clone()),
712 )])),
713 ..Default::default()
714 })
715}
716
717fn configmap_plan_name(configmap_name: &str) -> &str {
718 configmap_name
719 .strip_suffix("-sql")
720 .unwrap_or(configmap_name)
721}
722
723fn validate_existing_sql_configmap(
724 configmap: &ConfigMap,
725 prepared_sql: &PreparedPlanSql,
726) -> Result<(), ReconcileError> {
727 let Some(annotations) = configmap.metadata.annotations.as_ref() else {
728 return Err(ReconcileError::PlanSqlStorage(format!(
729 "existing ConfigMap {} is missing SQL storage annotations",
730 configmap.name_any()
731 )));
732 };
733 let hash_matches = annotations
734 .get("pgroles.io/redacted-sql-hash")
735 .map(|hash| hash == &prepared_sql.redacted_sql_hash)
736 .unwrap_or(false);
737 if hash_matches {
738 Ok(())
739 } else {
740 Err(ReconcileError::PlanSqlStorage(format!(
741 "existing ConfigMap {} does not match computed SQL preview hash",
742 configmap.name_any()
743 )))
744 }
745}
746
747async fn execute_changes_in_transaction(
751 pool: &sqlx::PgPool,
752 changes: &[pgroles_core::diff::Change],
753 sql_context: &pgroles_core::sql::SqlContext,
754) -> Result<usize, ReconcileError> {
755 let mut transaction = pool.begin().await?;
756 let mut statements_executed = 0usize;
757
758 for change in changes {
759 let is_sensitive = matches!(change, pgroles_core::diff::Change::SetPassword { .. });
760 for sql in pgroles_core::sql::render_statements_with_context(change, sql_context) {
761 if is_sensitive {
762 tracing::debug!("executing: ALTER ROLE ... PASSWORD [REDACTED]");
763 } else {
764 tracing::debug!(%sql, "executing");
765 }
766 sqlx::query(&sql).execute(transaction.as_mut()).await?;
767 statements_executed += 1;
768 }
769 }
770
771 transaction.commit().await?;
772 Ok(statements_executed)
773}
774
775pub async fn cleanup_old_plans_best_effort(
782 client: &Client,
783 policy: &PostgresPolicy,
784 max_plans: Option<usize>,
785) {
786 match tokio::time::timeout(
787 Duration::from_secs(CLEANUP_TIMEOUT_SECS),
788 cleanup_old_plans(client, policy, max_plans),
789 )
790 .await
791 {
792 Ok(Ok(())) => {}
793 Ok(Err(err)) => tracing::warn!(%err, "failed to clean up old plans"),
794 Err(_) => tracing::warn!(
795 timeout_secs = CLEANUP_TIMEOUT_SECS,
796 "timed out cleaning up old plans"
797 ),
798 }
799}
800
801pub async fn cleanup_old_plans(
807 client: &Client,
808 policy: &PostgresPolicy,
809 max_plans: Option<usize>,
810) -> Result<(), ReconcileError> {
811 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
812 let policy_name = policy.name_any();
813 let max_plans = max_plans.unwrap_or(DEFAULT_MAX_PLANS);
814
815 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
816 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
817 let existing_plans = plans_api
818 .list(&ListParams::default().labels(&label_selector))
819 .await?;
820 let now_ts = now_epoch_secs();
821
822 for plan in existing_plans
823 .iter()
824 .filter(|plan| is_stale_statusless_plan(plan, now_ts))
825 {
826 let plan_name = plan.name_any();
827 info!(
828 plan = %plan_name,
829 policy = %policy_name,
830 "cleaning up stale status-less plan"
831 );
832 if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
833 tracing::warn!(
834 plan = %plan_name,
835 %err,
836 "failed to delete stale status-less plan during cleanup"
837 );
838 }
839 }
840
841 let mut terminal_plans: Vec<&PostgresPolicyPlan> = existing_plans
843 .iter()
844 .filter(|plan| {
845 plan.status
846 .as_ref()
847 .map(|s| {
848 matches!(
849 s.phase,
850 PlanPhase::Applied
851 | PlanPhase::Failed
852 | PlanPhase::Superseded
853 | PlanPhase::Rejected
854 )
855 })
856 .unwrap_or(false)
857 })
858 .collect();
859
860 if terminal_plans.len() > max_plans {
861 terminal_plans.sort_by(|a, b| {
863 let a_time = a.metadata.creation_timestamp.as_ref();
864 let b_time = b.metadata.creation_timestamp.as_ref();
865 a_time.cmp(&b_time)
866 });
867
868 let plans_to_delete = terminal_plans.len() - max_plans;
869 for plan in terminal_plans.into_iter().take(plans_to_delete) {
870 let plan_name = plan.name_any();
871 info!(
872 plan = %plan_name,
873 policy = %policy_name,
874 "cleaning up old plan"
875 );
876 if let Err(err) = plans_api.delete(&plan_name, &DeleteParams::default()).await {
877 tracing::warn!(
878 plan = %plan_name,
879 %err,
880 "failed to delete old plan during cleanup"
881 );
882 }
883 }
884 }
885
886 cleanup_orphan_sql_configmaps(
887 client,
888 &namespace,
889 &policy_name,
890 &existing_plans.items,
891 now_ts,
892 )
893 .await?;
894
895 Ok(())
896}
897
898async fn cleanup_orphan_sql_configmaps(
903 client: &Client,
904 namespace: &str,
905 policy_name: &str,
906 existing_plans: &[PostgresPolicyPlan],
907 now_ts: i64,
908) -> Result<(), ReconcileError> {
909 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
910 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(policy_name));
911 let configmaps = configmaps_api
912 .list(&ListParams::default().labels(&label_selector))
913 .await?;
914 let known_plan_labels: BTreeSet<String> = existing_plans
915 .iter()
916 .map(|plan| sanitize_label_value(&plan.name_any()))
917 .collect();
918
919 for configmap in configmaps {
920 if !is_orphan_sql_configmap(&configmap, &known_plan_labels, now_ts) {
921 continue;
922 }
923
924 let configmap_name = configmap.name_any();
925 info!(
926 configmap = %configmap_name,
927 policy = %policy_name,
928 "cleaning up orphan plan SQL ConfigMap"
929 );
930 if let Err(err) = configmaps_api
931 .delete(&configmap_name, &DeleteParams::default())
932 .await
933 {
934 tracing::warn!(
935 configmap = %configmap_name,
936 %err,
937 "failed to delete orphan plan SQL ConfigMap during cleanup"
938 );
939 }
940 }
941
942 Ok(())
943}
944
945fn is_orphan_sql_configmap(
946 configmap: &ConfigMap,
947 known_plan_labels: &BTreeSet<String>,
948 now_ts: i64,
949) -> bool {
950 let Some(labels) = configmap.metadata.labels.as_ref() else {
951 return false;
952 };
953 if !labels.contains_key(LABEL_POLICY) || !is_stale_object(configmap, now_ts) {
954 return false;
955 }
956 labels
957 .get(LABEL_PLAN)
958 .map(|plan_label| !known_plan_labels.contains(plan_label))
959 .unwrap_or(true)
960}
961
962fn is_stale_statusless_plan(plan: &PostgresPolicyPlan, now_ts: i64) -> bool {
963 plan.status.is_none() && is_stale_object(plan, now_ts)
964}
965
966fn is_stale_object<K>(resource: &K, now_ts: i64) -> bool
967where
968 K: Resource,
969{
970 resource
971 .meta()
972 .creation_timestamp
973 .as_ref()
974 .map(|timestamp| now_ts.saturating_sub(timestamp.0.as_second()) > ORPHAN_GRACE_SECS)
975 .unwrap_or(false)
976}
977
978async fn delete_plan_best_effort(plans_api: &Api<PostgresPolicyPlan>, plan_name: &str) {
979 if let Err(err) = plans_api.delete(plan_name, &DeleteParams::default()).await {
980 tracing::warn!(
981 plan = %plan_name,
982 %err,
983 "failed to roll back plan after status update failure"
984 );
985 }
986}
987
988async fn delete_configmap_best_effort(client: &Client, namespace: &str, configmap_name: &str) {
989 let configmaps_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
990 if let Err(err) = configmaps_api
991 .delete(configmap_name, &DeleteParams::default())
992 .await
993 {
994 tracing::warn!(
995 configmap = %configmap_name,
996 %err,
997 "failed to roll back plan SQL ConfigMap"
998 );
999 }
1000}
1001
1002pub(crate) fn render_full_sql(
1004 changes: &[pgroles_core::diff::Change],
1005 sql_context: &pgroles_core::sql::SqlContext,
1006) -> String {
1007 changes
1008 .iter()
1009 .flat_map(|change| pgroles_core::sql::render_statements_with_context(change, sql_context))
1010 .collect::<Vec<_>>()
1011 .join("\n")
1012}
1013
1014fn render_redacted_sql(
1016 changes: &[pgroles_core::diff::Change],
1017 sql_context: &pgroles_core::sql::SqlContext,
1018) -> String {
1019 changes
1020 .iter()
1021 .flat_map(|change| {
1022 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1023 vec![format!(
1024 "ALTER ROLE {} PASSWORD '[REDACTED]';",
1025 pgroles_core::sql::quote_ident(name)
1026 )]
1027 } else {
1028 pgroles_core::sql::render_statements_with_context(change, sql_context)
1029 }
1030 })
1031 .collect::<Vec<_>>()
1032 .join("\n")
1033}
1034
1035pub(crate) fn compute_sql_hash(sql: &str) -> String {
1037 use std::fmt::Write as _;
1038
1039 let mut hasher = Sha256::new();
1040 hasher.update(sql.as_bytes());
1041 let digest = hasher.finalize();
1042 let mut hex = String::with_capacity(digest.len() * 2);
1043 for byte in digest {
1044 write!(&mut hex, "{byte:02x}").expect("writing to a string should succeed");
1045 }
1046 hex
1047}
1048
1049fn generate_plan_name(policy_name: &str, sql_hash: &str) -> String {
1056 let timestamp = format_timestamp_compact();
1057 let suffix = &sql_hash[..12.min(sql_hash.len())];
1058 let max_name_len = 253 - 4; let max_prefix_len = max_name_len - "-plan-".len() - timestamp.len() - "-".len() - suffix.len();
1062 let prefix = if policy_name.len() > max_prefix_len {
1063 policy_name
1064 .char_indices()
1065 .take_while(|(idx, ch)| idx + ch.len_utf8() <= max_prefix_len)
1066 .map(|(_, ch)| ch)
1067 .collect::<String>()
1068 } else {
1069 policy_name.to_string()
1070 };
1071 format!("{prefix}-plan-{timestamp}-{suffix}")
1072}
1073
1074fn format_timestamp_compact() -> String {
1076 use std::time::SystemTime;
1077 let now = SystemTime::now()
1078 .duration_since(SystemTime::UNIX_EPOCH)
1079 .unwrap_or_default();
1080 let secs = now.as_secs();
1081 let (year, month, day) = crate::crd::days_to_date(secs / 86400);
1082 let remaining = secs % 86400;
1083 let hours = remaining / 3600;
1084 let minutes = (remaining % 3600) / 60;
1085 let seconds = remaining % 60;
1086 format!("{year:04}{month:02}{day:02}-{hours:02}{minutes:02}{seconds:02}")
1087}
1088
1089fn sanitize_label_value(value: &str) -> String {
1093 let sanitized: String = value
1094 .chars()
1095 .map(|c| {
1096 if c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_' {
1097 c
1098 } else {
1099 '_'
1100 }
1101 })
1102 .take(63)
1103 .collect();
1104 sanitized
1105}
1106
1107fn now_epoch_secs() -> i64 {
1109 std::time::SystemTime::now()
1110 .duration_since(std::time::UNIX_EPOCH)
1111 .unwrap_or_default()
1112 .as_secs() as i64
1113}
1114
1115fn parse_rfc3339_epoch_secs(rfc3339: &str) -> Option<i64> {
1118 rfc3339
1120 .parse::<jiff::Timestamp>()
1121 .ok()
1122 .map(|t| t.as_second())
1123}
1124
1125fn build_owner_reference(policy: &PostgresPolicy) -> OwnerReference {
1127 OwnerReference {
1128 api_version: PostgresPolicy::api_version(&()).to_string(),
1129 kind: PostgresPolicy::kind(&()).to_string(),
1130 name: policy.name_any(),
1131 uid: policy.metadata.uid.clone().unwrap_or_default(),
1132 controller: Some(true),
1133 block_owner_deletion: Some(true),
1134 }
1135}
1136
1137async fn update_plan_phase(
1142 plans_api: &Api<PostgresPolicyPlan>,
1143 plan_name: &str,
1144 phase: PlanPhase,
1145) -> Result<(), ReconcileError> {
1146 let mut patch_value = serde_json::json!({ "status": { "phase": phase } });
1147 if phase == PlanPhase::Applying {
1148 patch_value["status"]["applying_since"] = serde_json::json!(crate::crd::now_rfc3339());
1149 }
1150 plans_api
1151 .patch_status(
1152 plan_name,
1153 &PatchParams::apply("pgroles-operator"),
1154 &Patch::Merge(&patch_value),
1155 )
1156 .await?;
1157 Ok(())
1158}
1159
1160fn set_plan_condition(
1165 conditions: &mut Vec<PolicyCondition>,
1166 condition_type: &str,
1167 status: &str,
1168 reason: &str,
1169 message: &str,
1170) {
1171 let transition_time = if let Some(existing) = conditions
1172 .iter()
1173 .find(|c| c.condition_type == condition_type)
1174 {
1175 if existing.status == status {
1176 existing.last_transition_time.clone()
1177 } else {
1178 Some(crate::crd::now_rfc3339())
1179 }
1180 } else {
1181 Some(crate::crd::now_rfc3339())
1182 };
1183
1184 let condition = PolicyCondition {
1185 condition_type: condition_type.to_string(),
1186 status: status.to_string(),
1187 reason: Some(reason.to_string()),
1188 message: Some(message.to_string()),
1189 last_transition_time: transition_time,
1190 };
1191 if let Some(existing) = conditions
1192 .iter_mut()
1193 .find(|c| c.condition_type == condition_type)
1194 {
1195 *existing = condition;
1196 } else {
1197 conditions.push(condition);
1198 }
1199}
1200
1201pub async fn update_policy_plan_ref(
1203 client: &Client,
1204 policy: &PostgresPolicy,
1205 plan_name: &str,
1206) -> Result<(), ReconcileError> {
1207 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1208 let policy_api: Api<PostgresPolicy> = Api::namespaced(client.clone(), &namespace);
1209
1210 let patch = serde_json::json!({
1211 "status": {
1212 "current_plan_ref": PlanReference {
1213 name: plan_name.to_string(),
1214 }
1215 }
1216 });
1217
1218 policy_api
1219 .patch_status(
1220 &policy.name_any(),
1221 &PatchParams::apply("pgroles-operator"),
1222 &Patch::Merge(&patch),
1223 )
1224 .await?;
1225
1226 Ok(())
1227}
1228
1229pub async fn get_current_actionable_plan(
1234 client: &Client,
1235 policy: &PostgresPolicy,
1236) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1237 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1238 let policy_name = policy.name_any();
1239
1240 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1241 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1242 let existing_plans = plans_api
1243 .list(&ListParams::default().labels(&label_selector))
1244 .await?;
1245
1246 let mut pending_plans: Vec<PostgresPolicyPlan> = existing_plans
1248 .into_iter()
1249 .filter(|plan| {
1250 plan.status
1251 .as_ref()
1252 .map(|s| matches!(s.phase, PlanPhase::Pending | PlanPhase::Approved))
1253 .unwrap_or(false)
1254 })
1255 .collect();
1256
1257 pending_plans.sort_by(|a, b| {
1258 let a_time = a.metadata.creation_timestamp.as_ref();
1259 let b_time = b.metadata.creation_timestamp.as_ref();
1260 b_time.cmp(&a_time) });
1262
1263 Ok(pending_plans.into_iter().next())
1264}
1265
1266pub async fn get_plan_by_phase(
1268 client: &Client,
1269 policy: &PostgresPolicy,
1270 target_phase: PlanPhase,
1271) -> Result<Option<PostgresPolicyPlan>, ReconcileError> {
1272 let namespace = policy.namespace().ok_or(ReconcileError::NoNamespace)?;
1273 let policy_name = policy.name_any();
1274
1275 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1276 let label_selector = format!("{LABEL_POLICY}={}", sanitize_label_value(&policy_name));
1277 let existing_plans = plans_api
1278 .list(&ListParams::default().labels(&label_selector))
1279 .await?;
1280
1281 let mut matching_plans: Vec<PostgresPolicyPlan> = existing_plans
1282 .into_iter()
1283 .filter(|plan| {
1284 plan.status
1285 .as_ref()
1286 .map(|s| s.phase == target_phase)
1287 .unwrap_or(false)
1288 })
1289 .collect();
1290
1291 matching_plans.sort_by(|a, b| {
1292 let a_time = a.metadata.creation_timestamp.as_ref();
1293 let b_time = b.metadata.creation_timestamp.as_ref();
1294 b_time.cmp(&a_time) });
1296
1297 Ok(matching_plans.into_iter().next())
1298}
1299
1300pub async fn mark_plan_failed(
1302 client: &Client,
1303 plan: &PostgresPolicyPlan,
1304 error_message: &str,
1305) -> Result<(), ReconcileError> {
1306 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1307 let plan_name = plan.name_any();
1308 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1309
1310 let mut status = plan.status.clone().unwrap_or_default();
1311 status.phase = PlanPhase::Failed;
1312 status.last_error = Some(error_message.to_string());
1313 status.failed_at = Some(crate::crd::now_rfc3339());
1314
1315 let patch = serde_json::json!({ "status": status });
1316 plans_api
1317 .patch_status(
1318 &plan_name,
1319 &PatchParams::apply("pgroles-operator"),
1320 &Patch::Merge(&patch),
1321 )
1322 .await?;
1323
1324 info!(
1325 plan = %plan_name,
1326 "marked stuck Applying plan as Failed"
1327 );
1328
1329 Ok(())
1330}
1331
1332pub async fn mark_plan_approved(
1337 client: &Client,
1338 plan: &PostgresPolicyPlan,
1339 reason: &str,
1340 message: &str,
1341) -> Result<(), ReconcileError> {
1342 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1343 let plan_name = plan.name_any();
1344 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1345
1346 let mut status = plan.status.clone().unwrap_or_default();
1347 status.phase = PlanPhase::Approved;
1348 set_plan_condition(&mut status.conditions, "Approved", "True", reason, message);
1349
1350 let patch = serde_json::json!({ "status": status });
1351 plans_api
1352 .patch_status(
1353 &plan_name,
1354 &PatchParams::apply("pgroles-operator"),
1355 &Patch::Merge(&patch),
1356 )
1357 .await?;
1358
1359 Ok(())
1360}
1361
1362pub async fn mark_plan_rejected(
1364 client: &Client,
1365 plan: &PostgresPolicyPlan,
1366) -> Result<(), ReconcileError> {
1367 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1368 let plan_name = plan.name_any();
1369 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1370
1371 let mut status = plan.status.clone().unwrap_or_default();
1372 status.phase = PlanPhase::Rejected;
1373 set_plan_condition(
1374 &mut status.conditions,
1375 "Approved",
1376 "False",
1377 "Rejected",
1378 "Plan rejected via annotation",
1379 );
1380
1381 let patch = serde_json::json!({ "status": status });
1382 plans_api
1383 .patch_status(
1384 &plan_name,
1385 &PatchParams::apply("pgroles-operator"),
1386 &Patch::Merge(&patch),
1387 )
1388 .await?;
1389
1390 Ok(())
1391}
1392
1393pub async fn mark_plan_superseded(
1395 client: &Client,
1396 plan: &PostgresPolicyPlan,
1397) -> Result<(), ReconcileError> {
1398 let namespace = plan.namespace().ok_or(ReconcileError::NoNamespace)?;
1399 let plan_name = plan.name_any();
1400 let plans_api: Api<PostgresPolicyPlan> = Api::namespaced(client.clone(), &namespace);
1401
1402 let mut status = plan.status.clone().unwrap_or_default();
1403 status.phase = PlanPhase::Superseded;
1404 set_plan_condition(
1405 &mut status.conditions,
1406 "Approved",
1407 "False",
1408 "Superseded",
1409 "Database state changed since plan was approved",
1410 );
1411
1412 let patch = serde_json::json!({ "status": status });
1413 plans_api
1414 .patch_status(
1415 &plan_name,
1416 &PatchParams::apply("pgroles-operator"),
1417 &Patch::Merge(&patch),
1418 )
1419 .await?;
1420
1421 Ok(())
1422}
1423
1424#[cfg(test)]
1429mod tests {
1430 use super::*;
1431 use crate::crd::CrdReconciliationMode;
1432 use base64::Engine as _;
1433 use flate2::read::GzDecoder;
1434 use std::io::Read;
1435
1436 fn test_plan(
1437 name: &str,
1438 phase: PlanPhase,
1439 annotations: Option<BTreeMap<String, String>>,
1440 ) -> PostgresPolicyPlan {
1441 let mut plan = PostgresPolicyPlan::new(
1442 name,
1443 PostgresPolicyPlanSpec {
1444 policy_ref: PolicyPlanRef {
1445 name: "test-policy".to_string(),
1446 },
1447 policy_generation: 1,
1448 reconciliation_mode: CrdReconciliationMode::Authoritative,
1449 owned_roles: vec!["role-a".to_string()],
1450 owned_schemas: vec!["public".to_string()],
1451 managed_database_identity: "default/db/DATABASE_URL".to_string(),
1452 },
1453 );
1454 plan.metadata.namespace = Some("default".to_string());
1455 plan.metadata.annotations = annotations;
1456 plan.status = Some(PostgresPolicyPlanStatus {
1457 phase,
1458 ..Default::default()
1459 });
1460 plan
1461 }
1462
1463 #[test]
1464 fn check_plan_approval_pending_when_no_annotations() {
1465 let plan = test_plan("plan-1", PlanPhase::Pending, None);
1466 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1467 }
1468
1469 #[test]
1470 fn check_plan_approval_approved_with_annotation() {
1471 let annotations =
1472 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string())]);
1473 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1474 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Approved);
1475 }
1476
1477 #[test]
1478 fn check_plan_approval_rejected_with_annotation() {
1479 let annotations =
1480 BTreeMap::from([(PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string())]);
1481 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1482 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1483 }
1484
1485 #[test]
1486 fn check_plan_approval_rejected_wins_over_approved() {
1487 let annotations = BTreeMap::from([
1488 (PLAN_APPROVED_ANNOTATION.to_string(), "true".to_string()),
1489 (PLAN_REJECTED_ANNOTATION.to_string(), "true".to_string()),
1490 ]);
1491 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1492 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Rejected);
1493 }
1494
1495 #[test]
1496 fn check_plan_approval_non_true_value_is_pending() {
1497 let annotations =
1498 BTreeMap::from([(PLAN_APPROVED_ANNOTATION.to_string(), "false".to_string())]);
1499 let plan = test_plan("plan-1", PlanPhase::Pending, Some(annotations));
1500 assert_eq!(check_plan_approval(&plan), PlanApprovalState::Pending);
1501 }
1502
1503 #[test]
1504 fn compute_sql_hash_is_deterministic() {
1505 let sql = "CREATE ROLE test LOGIN;\nGRANT SELECT ON ALL TABLES IN SCHEMA public TO test;";
1506 let hash1 = compute_sql_hash(sql);
1507 let hash2 = compute_sql_hash(sql);
1508 assert_eq!(hash1, hash2);
1509 assert_eq!(hash1.len(), 64); }
1511
1512 #[test]
1513 fn compute_sql_hash_differs_for_different_sql() {
1514 let hash1 = compute_sql_hash("CREATE ROLE a;");
1515 let hash2 = compute_sql_hash("CREATE ROLE b;");
1516 assert_ne!(hash1, hash2);
1517 }
1518
1519 #[test]
1520 fn compute_sql_hash_matches_pinned_fixture() {
1521 assert_eq!(
1522 compute_sql_hash("CREATE ROLE app LOGIN;"),
1523 "12a9743285d98ce73cfa9c840e943fc627d1fcbce22c5206fda1b21c84c1ac9c"
1524 );
1525 }
1526
1527 #[test]
1528 fn generate_plan_name_has_expected_format() {
1529 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1530 let name = generate_plan_name("my-policy", hash);
1531 assert!(name.starts_with("my-policy-plan-"));
1532 assert!(name.ends_with("-abcdef012345"));
1533 let suffix = name.strip_prefix("my-policy-plan-").unwrap();
1534 assert_eq!(suffix.len(), 28);
1536 assert_eq!(&suffix[8..9], "-");
1537 assert_eq!(&suffix[15..16], "-");
1538 }
1539
1540 #[test]
1541 fn generate_plan_name_is_idempotent_for_same_hash_in_same_second() {
1542 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1543 let name1 = generate_plan_name("my-policy", hash);
1544 let name2 = generate_plan_name("my-policy", hash);
1545 assert_eq!(name1, name2);
1546 }
1547
1548 #[test]
1549 fn generate_plan_name_truncates_on_utf8_boundary() {
1550 let hash = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
1551 let name = generate_plan_name(&"é".repeat(140), hash);
1552 assert!(name.len() <= 249);
1553 assert!(name.ends_with("-abcdef012345"));
1554 }
1555
1556 #[test]
1557 fn prepare_plan_sql_keeps_small_sql_inline() {
1558 let prepared = prepare_plan_sql("plan-1", "CREATE ROLE app LOGIN;").unwrap();
1559
1560 assert!(matches!(prepared.artifact, PlanSqlArtifact::Inline(_)));
1561 assert_eq!(
1562 prepared.sql_inline(),
1563 Some("CREATE ROLE app LOGIN;".to_string())
1564 );
1565 assert!(prepared.sql_ref().is_none());
1566 assert!(!prepared.is_truncated());
1567 }
1568
1569 #[test]
1570 fn prepare_plan_sql_compresses_large_brownfield_sized_sql() {
1571 let sql = brownfield_sized_sql();
1572 assert!(sql.len() > 1_048_576);
1573
1574 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1575
1576 let PlanSqlArtifact::CompressedConfigMap {
1577 key,
1578 compressed_sql,
1579 ..
1580 } = &prepared.artifact
1581 else {
1582 panic!("expected compressed ConfigMap artifact");
1583 };
1584 assert_eq!(key, SQL_CONFIGMAP_GZIP_KEY);
1585 assert!(compressed_sql.len() < MAX_CONFIGMAP_SQL_BYTES);
1586 assert_eq!(gunzip(compressed_sql), sql);
1587 assert_eq!(
1588 prepared.sql_ref().unwrap().compression,
1589 Some(SqlCompression::Gzip)
1590 );
1591 assert_eq!(prepared.original_bytes, sql.len());
1592 assert_eq!(prepared.stored_bytes, compressed_sql.len());
1593 }
1594
1595 #[test]
1596 fn configmap_binary_data_serializes_with_one_base64_layer() {
1597 let sql = brownfield_sized_sql();
1598 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1599 let PlanSqlArtifact::CompressedConfigMap {
1600 key,
1601 compressed_sql,
1602 ..
1603 } = &prepared.artifact
1604 else {
1605 panic!("expected compressed ConfigMap artifact");
1606 };
1607 let configmap = ConfigMap {
1608 binary_data: Some(BTreeMap::from([(
1609 key.clone(),
1610 ByteString(compressed_sql.clone()),
1611 )])),
1612 ..Default::default()
1613 };
1614
1615 let encoded = serde_json::to_value(&configmap).unwrap()["binaryData"][key]
1616 .as_str()
1617 .unwrap()
1618 .to_string();
1619 let decoded = base64::engine::general_purpose::STANDARD
1620 .decode(encoded)
1621 .unwrap();
1622
1623 assert_eq!(decoded, *compressed_sql);
1624 assert_eq!(gunzip(&decoded), sql);
1625 }
1626
1627 #[test]
1628 fn prepare_plan_sql_truncates_when_compressed_sql_is_still_too_large() {
1629 let sql = deterministic_incompressible_sql(1_400_000);
1630 let prepared = prepare_plan_sql("policy-plan-20260506-000000-abcdef012345", &sql).unwrap();
1631
1632 let PlanSqlArtifact::TruncatedInline(preview) = &prepared.artifact else {
1633 panic!("expected truncated inline artifact");
1634 };
1635 assert!(preview.len() <= MAX_INLINE_SQL_BYTES);
1636 assert!(preview.contains("truncated"));
1637 assert!(prepared.sql_ref().is_none());
1638 assert!(prepared.is_truncated());
1639 }
1640
1641 #[test]
1642 fn sanitize_label_value_replaces_slashes() {
1643 let sanitized = sanitize_label_value("default/db-creds/DATABASE_URL");
1644 assert!(!sanitized.contains('/'));
1645 assert_eq!(sanitized, "default_db-creds_DATABASE_URL");
1646 }
1647
1648 #[test]
1649 fn sanitize_label_value_truncates_to_63_chars() {
1650 let long_value = "a".repeat(100);
1651 let sanitized = sanitize_label_value(&long_value);
1652 assert!(sanitized.len() <= 63);
1653 }
1654
1655 #[test]
1656 fn stale_policy_sql_configmap_without_plan_label_is_orphan() {
1657 let configmap = ConfigMap {
1658 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1659 labels: Some(BTreeMap::from([(
1660 LABEL_POLICY.to_string(),
1661 sanitize_label_value("test-policy"),
1662 )])),
1663 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1664 jiff::Timestamp::from_second(0).unwrap(),
1665 )),
1666 ..Default::default()
1667 },
1668 ..Default::default()
1669 };
1670
1671 assert!(is_orphan_sql_configmap(
1672 &configmap,
1673 &BTreeSet::new(),
1674 ORPHAN_GRACE_SECS + 1
1675 ));
1676 }
1677
1678 #[test]
1679 fn stale_policy_sql_configmap_with_known_plan_label_is_not_orphan() {
1680 let plan_label = sanitize_label_value("test-policy-plan-20260506-000000-abcdef012345");
1681 let configmap = ConfigMap {
1682 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
1683 labels: Some(BTreeMap::from([
1684 (
1685 LABEL_POLICY.to_string(),
1686 sanitize_label_value("test-policy"),
1687 ),
1688 (LABEL_PLAN.to_string(), plan_label.clone()),
1689 ])),
1690 creation_timestamp: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
1691 jiff::Timestamp::from_second(0).unwrap(),
1692 )),
1693 ..Default::default()
1694 },
1695 ..Default::default()
1696 };
1697
1698 assert!(!is_orphan_sql_configmap(
1699 &configmap,
1700 &BTreeSet::from([plan_label]),
1701 ORPHAN_GRACE_SECS + 1
1702 ));
1703 }
1704
1705 #[test]
1706 fn render_redacted_sql_masks_passwords() {
1707 let changes = vec![
1708 pgroles_core::diff::Change::CreateRole {
1709 name: "app".to_string(),
1710 state: pgroles_core::model::RoleState {
1711 login: true,
1712 ..pgroles_core::model::RoleState::default()
1713 },
1714 },
1715 pgroles_core::diff::Change::SetPassword {
1716 name: "app".to_string(),
1717 password: "super_secret".to_string(),
1718 },
1719 ];
1720 let ctx = pgroles_core::sql::SqlContext::default();
1721 let redacted = render_redacted_sql(&changes, &ctx);
1722
1723 assert!(redacted.contains("[REDACTED]"));
1724 assert!(!redacted.contains("super_secret"));
1725 assert!(redacted.contains("CREATE ROLE"));
1726 }
1727
1728 #[test]
1729 fn render_full_sql_includes_passwords() {
1730 let changes = vec![pgroles_core::diff::Change::SetPassword {
1731 name: "app".to_string(),
1732 password: "super_secret".to_string(),
1733 }];
1734 let ctx = pgroles_core::sql::SqlContext::default();
1735 let full = render_full_sql(&changes, &ctx);
1736
1737 assert!(full.contains("super_secret") || full.contains("SCRAM-SHA-256"));
1738 }
1739
1740 #[test]
1741 fn now_epoch_secs_returns_plausible_value() {
1742 let now = now_epoch_secs();
1743 let y2025 = 1_735_689_600_i64;
1745 let y2100 = 4_102_444_800_i64;
1746 assert!(
1747 now > y2025 && now < y2100,
1748 "epoch secs {now} should be between 2025 and 2100"
1749 );
1750 }
1751
1752 fn brownfield_sized_sql() -> String {
1753 let mut sql = String::new();
1754 for schema in 0..33 {
1755 for profile in ["reader", "writer", "owner", "cdc"] {
1756 let role = format!("schema_{schema}_{profile}");
1757 sql.push_str(&format!(
1758 "CREATE ROLE \"{role}\" LOGIN;\nCOMMENT ON ROLE \"{role}\" IS 'Generated from profile {profile} for brownfield migration schema {schema} with cdc ownership directives and review metadata';\n"
1759 ));
1760 for relkind in ["TABLES", "SEQUENCES", "FUNCTIONS"] {
1761 sql.push_str(&format!(
1762 "GRANT SELECT ON ALL {relkind} IN SCHEMA \"schema_{schema}\" TO \"{role}\";\n"
1763 ));
1764 }
1765 for owner in 0..20 {
1766 sql.push_str(&format!(
1767 "ALTER DEFAULT PRIVILEGES FOR ROLE \"owner_{owner}\" IN SCHEMA \"schema_{schema}\" GRANT SELECT ON TABLES TO \"{role}\";\n"
1768 ));
1769 }
1770 }
1771 }
1772 for member in 0..70 {
1773 sql.push_str(&format!(
1774 "GRANT \"group_{member}\" TO \"service_login_{}\";\n",
1775 member % 20
1776 ));
1777 }
1778 while sql.len() <= 1_100_000 {
1779 sql.push_str("-- brownfield migration padding for large plan regression\n");
1780 }
1781 sql
1782 }
1783
1784 fn deterministic_incompressible_sql(target_bytes: usize) -> String {
1785 let mut state = 0x1234_5678_u64;
1786 let mut sql = String::with_capacity(target_bytes);
1787 while sql.len() < target_bytes {
1788 state ^= state << 13;
1789 state ^= state >> 7;
1790 state ^= state << 17;
1791 let value = (state % 62) as u8;
1792 let ch = match value {
1793 0..=9 => b'0' + value,
1794 10..=35 => b'a' + (value - 10),
1795 _ => b'A' + (value - 36),
1796 };
1797 sql.push(ch as char);
1798 if sql.len().is_multiple_of(120) {
1799 sql.push('\n');
1800 }
1801 }
1802 sql
1803 }
1804
1805 fn gunzip(bytes: &[u8]) -> String {
1806 let mut decoder = GzDecoder::new(bytes);
1807 let mut decoded = String::new();
1808 decoder.read_to_string(&mut decoded).unwrap();
1809 decoded
1810 }
1811}