1use std::sync::Arc;
15use std::time::Duration;
16
17use crate::events::{PlanEventType, publish_plan_event, publish_status_events};
18use kube::ResourceExt;
19use kube::api::{Api, Patch, PatchParams};
20use kube::runtime::controller::Action;
21use kube::runtime::finalizer::{self, Event as FinalizerEvent};
22use tracing::info;
23
24use crate::context::{ContextError, OperatorContext};
25use crate::crd::{
26 ChangeSummary, DatabaseIdentity, PolicyMode, PostgresPolicy, PostgresPolicyPlan,
27 PostgresPolicyStatus, conflict_condition, degraded_condition, drifted_condition,
28 paused_condition, ready_condition, reconciling_condition,
29};
30
31const FINALIZER: &str = "pgroles.io/finalizer";
33
34const DEFAULT_REQUEUE_SECS: u64 = 300; const LOCK_CONTENTION_BASE_SECS: u64 = 10;
39
40const LOCK_CONTENTION_JITTER_SECS: u64 = 20;
42
43const TRANSIENT_BACKOFF_BASE_SECS: u64 = 5;
45
46const TRANSIENT_BACKOFF_MAX_SECS: u64 = 300;
48
49const SQLSTATE_INSUFFICIENT_PRIVILEGE: &str = "42501";
51const SQLSTATE_INVALID_SCHEMA_NAME: &str = "3F000";
52const SQLSTATE_UNDEFINED_TABLE: &str = "42P01";
53const SQLSTATE_UNDEFINED_FUNCTION: &str = "42883";
54const SQLSTATE_UNDEFINED_OBJECT: &str = "42704";
55
56const MAX_PLANNED_SQL_STATUS_BYTES: usize = 16 * 1024;
58
59enum ReconcileOutcome {
60 Reconciled,
61 Planned,
62 Suspended,
63 Conflict,
64 LockContention,
65}
66
67impl ReconcileOutcome {
68 fn result(&self) -> &'static str {
69 match self {
70 ReconcileOutcome::Reconciled => "success",
71 ReconcileOutcome::Planned => "planned",
72 ReconcileOutcome::Suspended => "suspended",
73 ReconcileOutcome::Conflict => "conflict",
74 ReconcileOutcome::LockContention => "contention",
75 }
76 }
77
78 fn reason(&self) -> &'static str {
79 match self {
80 ReconcileOutcome::Reconciled => "Reconciled",
81 ReconcileOutcome::Planned => "Planned",
82 ReconcileOutcome::Suspended => "Suspended",
83 ReconcileOutcome::Conflict => "ConflictingPolicy",
84 ReconcileOutcome::LockContention => "LockContention",
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum RetryClass {
91 Slow,
92 LockContention,
93 Transient,
94}
95
96#[derive(Debug, thiserror::Error)]
98pub enum ReconcileError {
99 #[error("context error: {0}")]
100 Context(#[from] Box<ContextError>),
101
102 #[error("manifest expansion error: {0}")]
103 ManifestExpansion(#[from] pgroles_core::manifest::ManifestError),
104
105 #[error("database inspection error: {0}")]
106 Inspect(#[from] pgroles_inspect::InspectError),
107
108 #[error("SQL execution error: {0}")]
109 SqlExec(#[from] sqlx::Error),
110
111 #[error("{0}")]
112 UnsafeRoleDrops(String),
113
114 #[error("Kubernetes API error: {0}")]
115 Kube(#[from] kube::Error),
116
117 #[error("resource has no namespace")]
118 NoNamespace,
119
120 #[error("invalid interval \"{0}\": {1}")]
121 InvalidInterval(String, String),
122
123 #[error("invalid spec: {0}")]
124 InvalidSpec(String),
125
126 #[error(
127 "policy references objects that do not exist in target database: {0}. Either create \
128 the missing objects, remove them from the policy, or verify the policy is pointing at \
129 the intended database."
130 )]
131 MissingDatabaseObjects(String),
132
133 #[error("{0}")]
134 UnsatisfiableWildcardGrant(String),
135
136 #[error("{0}")]
137 ConflictingPolicy(String),
138
139 #[error("lock contention on database \"{0}\": {1}")]
140 LockContention(String, String),
141
142 #[error("Secret \"{secret}\" key \"{key}\" for role \"{role}\" password is empty")]
143 EmptyPasswordSecret {
144 role: String,
145 secret: String,
146 key: String,
147 },
148
149 #[error("password generation error: {0}")]
150 PasswordGeneration(#[from] Box<crate::password::PasswordError>),
151
152 #[error("plan SQL storage error: {0}")]
153 PlanSqlStorage(String),
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157struct ResolvedPassword {
158 cleartext: String,
159 source_version: String,
160}
161
162fn parse_interval(interval: &str) -> Result<Duration, ReconcileError> {
164 let interval = interval.trim();
165 if interval.is_empty() {
166 return Ok(Duration::from_secs(DEFAULT_REQUEUE_SECS));
167 }
168
169 let mut total_secs: u64 = 0;
170 let mut current_num = String::new();
171
172 for ch in interval.chars() {
173 if ch.is_ascii_digit() {
174 current_num.push(ch);
175 } else {
176 let num: u64 = current_num.parse().map_err(|_| {
177 ReconcileError::InvalidInterval(
178 interval.to_string(),
179 format!("invalid number before '{ch}'"),
180 )
181 })?;
182 current_num.clear();
183
184 match ch {
185 'h' => total_secs += num * 3600,
186 'm' => total_secs += num * 60,
187 's' => total_secs += num,
188 _ => {
189 return Err(ReconcileError::InvalidInterval(
190 interval.to_string(),
191 format!("unknown unit '{ch}'"),
192 ));
193 }
194 }
195 }
196 }
197
198 if !current_num.is_empty() {
200 let num: u64 = current_num.parse().map_err(|_| {
201 ReconcileError::InvalidInterval(interval.to_string(), "trailing number".to_string())
202 })?;
203 total_secs += num;
204 }
205
206 if total_secs == 0 {
207 return Ok(Duration::from_secs(DEFAULT_REQUEUE_SECS));
208 }
209
210 Ok(Duration::from_secs(total_secs))
211}
212
213pub async fn reconcile(
217 resource: Arc<PostgresPolicy>,
218 ctx: Arc<OperatorContext>,
219) -> Result<Action, finalizer::Error<ReconcileError>> {
220 let api: Api<PostgresPolicy> = Api::namespaced(
221 ctx.kube_client.clone(),
222 resource.namespace().as_deref().unwrap_or("default"),
223 );
224
225 finalizer::finalizer(&api, FINALIZER, resource, |event| async {
226 match event {
227 FinalizerEvent::Apply(resource) => reconcile_apply(&resource, &ctx).await,
228 FinalizerEvent::Cleanup(resource) => reconcile_cleanup(&resource, &ctx).await,
229 }
230 })
231 .await
232}
233
234pub fn error_policy(
236 resource: Arc<PostgresPolicy>,
237 error: &finalizer::Error<ReconcileError>,
238 _ctx: Arc<OperatorContext>,
239) -> Action {
240 retry_action(&resource, error)
241}
242
243fn retry_action(resource: &PostgresPolicy, error: &finalizer::Error<ReconcileError>) -> Action {
244 match retry_class(error) {
245 RetryClass::LockContention => {
246 if let finalizer::Error::ApplyFailed(ReconcileError::LockContention(db, reason)) = error
247 {
248 tracing::info!(database = %db, reason = %reason, "requeuing due to lock contention");
249 }
250 requeue_with_jitter()
251 }
252 RetryClass::Slow => {
253 let delay = slow_retry_delay(resource);
254 tracing::info!(
255 delay_secs = delay.as_secs(),
256 error = %error,
257 "requeuing on normal interval for non-transient failure"
258 );
259 Action::requeue(delay)
260 }
261 RetryClass::Transient => {
262 let attempts = next_transient_failure_count(resource);
263 let delay = transient_backoff_delay(attempts);
264 tracing::warn!(
265 attempts,
266 delay_secs = delay.as_secs(),
267 error = %error,
268 "requeuing with exponential backoff after transient failure"
269 );
270 Action::requeue(delay)
271 }
272 }
273}
274
275fn requeue_with_jitter() -> Action {
277 let delay = jitter_delay();
278 tracing::debug!(delay_secs = delay.as_secs(), "requeue with jitter");
279 Action::requeue(delay)
280}
281
282fn jitter_delay() -> Duration {
287 let nanos = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .unwrap_or_default()
293 .subsec_nanos();
294 let thread_entropy = {
295 use std::hash::{Hash, Hasher};
296 let mut hasher = std::collections::hash_map::DefaultHasher::new();
297 std::thread::current().id().hash(&mut hasher);
298 hasher.finish() as u32
299 };
300 let jitter_secs = ((nanos ^ thread_entropy) as u64) % (LOCK_CONTENTION_JITTER_SECS + 1);
301 Duration::from_secs(LOCK_CONTENTION_BASE_SECS + jitter_secs)
302}
303
304fn transient_backoff_delay(attempts: u32) -> Duration {
305 let exponent = attempts.saturating_sub(1).min(10);
306 let base_delay = TRANSIENT_BACKOFF_BASE_SECS
307 .saturating_mul(1_u64 << exponent)
308 .min(TRANSIENT_BACKOFF_MAX_SECS);
309 let remaining_headroom = TRANSIENT_BACKOFF_MAX_SECS.saturating_sub(base_delay);
310 let jitter_window = remaining_headroom.min((base_delay / 2).max(1));
311 let jitter_secs = if jitter_window == 0 {
312 0
313 } else {
314 pseudo_random_window(jitter_window)
315 };
316 Duration::from_secs((base_delay + jitter_secs).min(TRANSIENT_BACKOFF_MAX_SECS))
317}
318
319fn pseudo_random_window(window_secs: u64) -> u64 {
320 if window_secs == 0 {
321 return 0;
322 }
323 let nanos = std::time::SystemTime::now()
324 .duration_since(std::time::UNIX_EPOCH)
325 .unwrap_or_default()
326 .subsec_nanos();
327 let thread_entropy = {
328 use std::hash::{Hash, Hasher};
329 let mut hasher = std::collections::hash_map::DefaultHasher::new();
330 std::thread::current().id().hash(&mut hasher);
331 hasher.finish() as u32
332 };
333 ((nanos ^ thread_entropy) as u64) % (window_secs + 1)
334}
335
336fn retry_class(error: &finalizer::Error<ReconcileError>) -> RetryClass {
337 match error {
338 finalizer::Error::ApplyFailed(reconcile_error) => {
339 retry_class_for_reconcile_error(reconcile_error)
340 }
341 finalizer::Error::CleanupFailed(_)
342 | finalizer::Error::AddFinalizer(_)
343 | finalizer::Error::RemoveFinalizer(_)
344 | finalizer::Error::UnnamedObject
345 | finalizer::Error::InvalidFinalizer => RetryClass::Transient,
346 }
347}
348
349fn retry_class_for_reconcile_error(error: &ReconcileError) -> RetryClass {
350 match error {
351 ReconcileError::LockContention(_, _) => RetryClass::LockContention,
352 ReconcileError::ManifestExpansion(_)
353 | ReconcileError::InvalidInterval(_, _)
354 | ReconcileError::InvalidSpec(_)
355 | ReconcileError::MissingDatabaseObjects(_)
356 | ReconcileError::UnsatisfiableWildcardGrant(_)
357 | ReconcileError::ConflictingPolicy(_)
358 | ReconcileError::UnsafeRoleDrops(_)
359 | ReconcileError::EmptyPasswordSecret { .. }
360 | ReconcileError::NoNamespace
361 | ReconcileError::PlanSqlStorage(_) => RetryClass::Slow,
362 ReconcileError::PasswordGeneration(err) => {
363 if err.is_transient() {
364 RetryClass::Transient
365 } else {
366 RetryClass::Slow
367 }
368 }
369 ReconcileError::Context(context) => match context.as_ref() {
370 ContextError::SecretMissing { .. } => RetryClass::Slow,
371 ContextError::SecretFetch { .. } => {
372 if context.is_secret_fetch_non_transient() {
373 RetryClass::Slow
374 } else {
375 RetryClass::Transient
376 }
377 }
378 ContextError::DatabaseConnect { .. } => RetryClass::Transient,
379 ContextError::EmptyResolvedValue { .. }
380 | ContextError::InvalidResolvedSslMode { .. } => RetryClass::Slow,
381 },
382 ReconcileError::Inspect(error) => {
383 if inspect_error_is_non_transient(error) {
384 RetryClass::Slow
385 } else {
386 RetryClass::Transient
387 }
388 }
389 ReconcileError::SqlExec(error) => {
390 if sqlx_error_is_non_transient(error) {
391 RetryClass::Slow
392 } else {
393 RetryClass::Transient
394 }
395 }
396 ReconcileError::Kube(_) => RetryClass::Transient,
397 }
398}
399
400fn inspect_error_is_non_transient(error: &pgroles_inspect::InspectError) -> bool {
401 match error {
402 pgroles_inspect::InspectError::Database(error) => sqlx_error_is_non_transient(error),
403 }
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408enum SqlErrorKind {
409 InsufficientPrivileges,
412 MissingDatabaseObject,
416 Transient,
418}
419
420fn classify_sqlx_error(error: &sqlx::Error) -> SqlErrorKind {
421 match error
422 .as_database_error()
423 .and_then(|database_error| database_error.code())
424 .as_deref()
425 {
426 Some(SQLSTATE_INSUFFICIENT_PRIVILEGE) => SqlErrorKind::InsufficientPrivileges,
427 Some(SQLSTATE_INVALID_SCHEMA_NAME)
428 | Some(SQLSTATE_UNDEFINED_TABLE)
429 | Some(SQLSTATE_UNDEFINED_FUNCTION)
430 | Some(SQLSTATE_UNDEFINED_OBJECT) => SqlErrorKind::MissingDatabaseObject,
431 _ => SqlErrorKind::Transient,
432 }
433}
434
435fn sqlx_error_is_non_transient(error: &sqlx::Error) -> bool {
436 !matches!(classify_sqlx_error(error), SqlErrorKind::Transient)
437}
438
439fn next_transient_failure_count(resource: &PostgresPolicy) -> u32 {
440 resource
441 .status
442 .as_ref()
443 .map(|status| status.transient_failure_count.max(0) as u32)
444 .unwrap_or(0)
445 .saturating_add(1)
446}
447
448fn slow_retry_delay(resource: &PostgresPolicy) -> Duration {
449 parse_interval(&resource.spec.interval)
450 .unwrap_or_else(|_| Duration::from_secs(DEFAULT_REQUEUE_SECS))
451}
452
453fn referenced_schema_names(
459 expanded: &pgroles_core::manifest::ExpandedManifest,
460) -> std::collections::BTreeSet<String> {
461 let mut names: std::collections::BTreeSet<String> = expanded
462 .schemas
463 .iter()
464 .map(|schema| schema.name.clone())
465 .collect();
466 for grant in &expanded.grants {
467 if grant.object.object_type == pgroles_core::manifest::ObjectType::Schema
468 && let Some(name) = &grant.object.name
469 {
470 names.insert(name.clone());
471 }
472 if let Some(schema) = &grant.object.schema {
473 names.insert(schema.clone());
474 }
475 }
476 for dp in &expanded.default_privileges {
477 names.insert(dp.schema.clone());
478 }
479 names
480}
481
482fn declared_schema_names(
483 expanded: &pgroles_core::manifest::ExpandedManifest,
484) -> std::collections::BTreeSet<String> {
485 expanded
486 .schemas
487 .iter()
488 .map(|schema| schema.name.clone())
489 .collect()
490}
491
492fn is_system_schema(name: &str) -> bool {
498 name.starts_with("pg_") || name == "information_schema"
499}
500
501async fn validate_referenced_schemas_exist(
508 pool: &sqlx::PgPool,
509 expanded: &pgroles_core::manifest::ExpandedManifest,
510) -> Result<(), ReconcileError> {
511 let referenced = externally_required_schema_names(expanded);
512 if referenced.is_empty() {
513 return Ok(());
514 }
515 let existing = pgroles_inspect::fetch_existing_schemas(pool).await?;
516 let missing: Vec<String> = referenced
517 .into_iter()
518 .filter(|name| !existing.contains(name))
519 .collect();
520 if missing.is_empty() {
521 Ok(())
522 } else {
523 let formatted = missing
524 .iter()
525 .map(|name| format!("schema \"{name}\""))
526 .collect::<Vec<_>>()
527 .join(", ");
528 Err(ReconcileError::MissingDatabaseObjects(formatted))
529 }
530}
531
532fn externally_required_schema_names(
533 expanded: &pgroles_core::manifest::ExpandedManifest,
534) -> std::collections::BTreeSet<String> {
535 let declared = declared_schema_names(expanded);
536 referenced_schema_names(expanded)
537 .into_iter()
538 .filter(|name| !is_system_schema(name) && !declared.contains(name))
539 .collect()
540}
541
542async fn reconcile_apply(
555 resource: &PostgresPolicy,
556 ctx: &OperatorContext,
557) -> Result<Action, ReconcileError> {
558 let reconcile_guard = ctx.observability.start_reconcile();
559
560 let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
561 let identity = DatabaseIdentity::from_connection(&namespace, &resource.spec.connection);
562
563 match reconcile_apply_inner(resource, ctx, &identity).await {
564 Ok((action, outcome)) => {
565 reconcile_guard.record_result(outcome.result(), outcome.reason());
566 Ok(action)
567 }
568 Err(ReconcileError::LockContention(db, reason)) => {
569 ctx.observability.record_lock_contention();
572 reconcile_guard.record_result(
573 ReconcileOutcome::LockContention.result(),
574 ReconcileOutcome::LockContention.reason(),
575 );
576 tracing::info!(database = %db, %reason, "lock contention — will requeue");
577 Err(ReconcileError::LockContention(db, reason))
578 }
579 Err(err) => {
580 let error_message = err.to_string();
581 let error_reason = err.reason();
582 let is_transient_failure =
583 retry_class_for_reconcile_error(&err) == RetryClass::Transient;
584 let clear_current_plan_ref =
587 matches!(&err, ReconcileError::UnsatisfiableWildcardGrant(_));
588 match error_reason {
589 "DatabaseConnectionFailed" => {
590 ctx.observability.record_database_connection_failure()
591 }
592 "InvalidSpec" => ctx.observability.record_invalid_spec(),
593 "ConflictingPolicy" => ctx.observability.record_policy_conflict(),
594 "ApplyFailed" | "MissingDatabaseObject" | "UnsatisfiableWildcardGrant" => {
595 ctx.observability.record_apply_result("error")
596 }
597 _ => {}
598 }
599 reconcile_guard.record_result("error", error_reason);
600 if let Err(status_err) = update_status(ctx, resource, |status| {
601 mark_reconcile_failure_status(
602 status,
603 error_reason,
604 &error_message,
605 is_transient_failure,
606 clear_current_plan_ref,
607 );
608 })
609 .await
610 {
611 tracing::warn!(%status_err, "failed to update degraded status");
612 }
613 Err(err)
614 }
615 }
616}
617
618fn mark_reconcile_failure_status(
619 status: &mut PostgresPolicyStatus,
620 error_reason: &str,
621 error_message: &str,
622 is_transient_failure: bool,
623 clear_current_plan_ref: bool,
624) {
625 status.set_condition(ready_condition(false, error_reason, error_message));
626 status.set_condition(degraded_condition(error_reason, error_message));
627 status.conditions.retain(|c| {
628 c.condition_type != "Reconciling"
629 && c.condition_type != "Paused"
630 && c.condition_type != "Drifted"
631 && c.condition_type != "Conflict"
632 });
633 status.change_summary = None;
634 status.planned_sql = None;
635 status.planned_sql_truncated = false;
636 if clear_current_plan_ref {
637 status.current_plan_ref = None;
638 }
639 status.last_error = Some(error_message.to_string());
640 if is_transient_failure {
641 status.transient_failure_count += 1;
642 } else {
643 status.transient_failure_count = 0;
644 }
645}
646
647async fn reconcile_apply_inner(
648 resource: &PostgresPolicy,
649 ctx: &OperatorContext,
650 identity: &DatabaseIdentity,
651) -> Result<(Action, ReconcileOutcome), ReconcileError> {
652 let name = resource.name_any();
653 let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
654
655 let spec = &resource.spec;
656 let requeue_interval = parse_interval(&spec.interval)?;
657 let generation = resource.metadata.generation;
658
659 if spec.suspend {
661 update_status(ctx, resource, |status| {
662 status.set_condition(paused_condition("Reconciliation suspended by spec"));
663 status.set_condition(ready_condition(
664 false,
665 "Suspended",
666 "Reconciliation suspended by spec",
667 ));
668 status
669 .conditions
670 .retain(|c| c.condition_type != "Reconciling" && c.condition_type != "Drifted");
671 status.last_attempted_generation = generation;
672 status.last_error = None;
673 status.planned_sql = None;
674 status.planned_sql_truncated = false;
675 status.transient_failure_count = 0;
676 })
677 .await?;
678 info!(name, namespace, "reconciliation suspended, requeuing");
679 return Ok((
680 Action::requeue(requeue_interval),
681 ReconcileOutcome::Suspended,
682 ));
683 }
684
685 info!(name, namespace, "starting reconciliation");
686
687 update_status(ctx, resource, |status| {
692 status.set_condition(reconciling_condition("Reconciliation in progress"));
693 status
694 .conditions
695 .retain(|c| c.condition_type != "Paused" && c.condition_type != "Drifted");
696 status.last_attempted_generation = generation;
697 })
698 .await?;
699
700 spec.validate_connection_spec()
701 .map_err(|err| ReconcileError::InvalidSpec(err.to_string()))?;
702 spec.validate_password_specs(&name)
703 .map_err(|err| ReconcileError::InvalidSpec(err.to_string()))?;
704
705 let ownership = spec.ownership_claims()?;
706 update_status(ctx, resource, |status| {
707 status.managed_database_identity = Some(identity.as_str().to_string());
708 status.owned_roles = ownership.roles.iter().cloned().collect();
709 status.owned_schemas = ownership.schemas.iter().cloned().collect();
710 })
711 .await?;
712
713 if let Some(conflict_message) =
714 detect_policy_conflict(ctx, resource, identity, &ownership).await?
715 {
716 update_status(ctx, resource, |status| {
717 status.set_condition(ready_condition(
718 false,
719 "ConflictingPolicy",
720 &conflict_message,
721 ));
722 status.set_condition(conflict_condition("ConflictingPolicy", &conflict_message));
723 status.set_condition(degraded_condition("ConflictingPolicy", &conflict_message));
724 status
725 .conditions
726 .retain(|c| c.condition_type != "Reconciling" && c.condition_type != "Drifted");
727 status.change_summary = None;
728 status.planned_sql = None;
729 status.planned_sql_truncated = false;
730 status.last_error = Some(conflict_message.clone());
731 status.transient_failure_count = 0;
732 })
733 .await?;
734 ctx.observability.record_policy_conflict();
735 info!(name, namespace, %conflict_message, "reconciliation blocked by conflicting policy");
736 return Ok((
737 Action::requeue(requeue_interval),
738 ReconcileOutcome::Conflict,
739 ));
740 }
741
742 let manifest = spec.to_policy_manifest();
744
745 let expanded = pgroles_core::manifest::expand_manifest(&manifest)?;
747
748 let default_owner = manifest.default_owner.as_deref();
750 let desired = pgroles_core::model::RoleGraph::from_expanded(&expanded, default_owner)?;
751
752 let pool = ctx
763 .get_or_create_pool(&namespace, &spec.connection)
764 .await
765 .map_err(Box::new)?;
766
767 let _db_lock = match ctx.try_lock_database(identity.as_str()).await {
778 Some(guard) => guard,
779 None => {
780 return Err(ReconcileError::LockContention(
781 identity.as_str().to_string(),
782 "in-process lock held by another reconcile".to_string(),
783 ));
784 }
785 };
786
787 let advisory_lock = match crate::advisory::try_acquire(&pool, identity.as_str()).await {
789 Ok(Some(lock)) => lock,
790 Ok(None) => {
791 return Err(ReconcileError::LockContention(
792 identity.as_str().to_string(),
793 "PostgreSQL advisory lock held by another session".to_string(),
794 ));
795 }
796 Err(err) => {
797 tracing::warn!(%err, "failed to acquire advisory lock — treating as connection error");
798 return Err(ReconcileError::SqlExec(err));
799 }
800 };
801
802 let result = apply_under_lock(
804 resource,
805 ctx,
806 &pool,
807 &manifest,
808 &expanded,
809 &desired,
810 generation,
811 requeue_interval,
812 &name,
813 &namespace,
814 identity,
815 )
816 .await;
817
818 advisory_lock.release().await;
820
821 crate::plan::cleanup_old_plans_best_effort(&ctx.kube_client, resource, None).await;
822
823 result
824}
825
826#[allow(clippy::too_many_arguments)]
830async fn apply_under_lock(
831 resource: &PostgresPolicy,
832 ctx: &OperatorContext,
833 pool: &sqlx::PgPool,
834 manifest: &pgroles_core::manifest::PolicyManifest,
835 expanded: &pgroles_core::manifest::ExpandedManifest,
836 desired: &pgroles_core::model::RoleGraph,
837 generation: Option<i64>,
838 requeue_interval: Duration,
839 name: &str,
840 namespace: &str,
841 identity: &DatabaseIdentity,
842) -> Result<(Action, ReconcileOutcome), ReconcileError> {
843 if let Some(stuck_plan) =
845 crate::plan::get_plan_by_phase(&ctx.kube_client, resource, crate::crd::PlanPhase::Applying)
846 .await?
847 {
848 let applying_since_secs = stuck_plan
849 .status
850 .as_ref()
851 .and_then(|s| s.applying_since.as_deref())
852 .and_then(parse_rfc3339_to_epoch_secs);
853 if let Some(since_secs) = applying_since_secs {
854 let now_secs = std::time::SystemTime::now()
855 .duration_since(std::time::UNIX_EPOCH)
856 .unwrap_or_default()
857 .as_secs();
858 let elapsed_secs = now_secs.saturating_sub(since_secs);
859 let stuck_threshold_secs = 5 * 60; if elapsed_secs > stuck_threshold_secs {
861 tracing::warn!(
862 plan = %stuck_plan.name_any(),
863 elapsed_secs,
864 "detected stuck Applying plan — marking as Failed"
865 );
866 crate::plan::mark_plan_failed(
867 &ctx.kube_client,
868 &stuck_plan,
869 "execution interrupted: operator restarted during apply",
870 )
871 .await?;
872 }
873 }
874 }
875
876 let has_database_grants = expanded
878 .grants
879 .iter()
880 .any(|g| g.object.object_type == pgroles_core::manifest::ObjectType::Database);
881 let inspect_config =
882 pgroles_inspect::InspectConfig::from_expanded(expanded, has_database_grants)
883 .with_additional_roles(
884 manifest
885 .retirements
886 .iter()
887 .map(|retirement| retirement.role.clone()),
888 );
889 let inspection = pgroles_inspect::inspect_with_diagnostics(pool, &inspect_config).await?;
890 ctx.observability.record_inspection(&inspection.stats);
891 if !inspection.diagnostics.is_empty() {
892 return Err(ReconcileError::UnsatisfiableWildcardGrant(
893 inspection.diagnostics.to_string(),
894 ));
895 }
896 let current = inspection.graph;
897
898 validate_referenced_schemas_exist(pool, expanded).await?;
903
904 let reconciliation_mode: pgroles_core::diff::ReconciliationMode =
907 resource.spec.reconciliation_mode.into();
908 tracing::info!(%reconciliation_mode, "reconciliation mode");
909 let mut changes = pgroles_core::diff::filter_changes(
910 pgroles_core::diff::apply_role_retirements(
911 pgroles_core::diff::diff(¤t, desired),
912 &manifest.retirements,
913 ),
914 reconciliation_mode,
915 );
916
917 let resolved_passwords = resolve_passwords_from_secrets(ctx, resource, namespace).await?;
918 let (password_changes, applied_password_source_versions) =
919 select_password_changes(&changes, &resolved_passwords, resource.status.as_ref());
920 if !password_changes.is_empty() {
921 changes = pgroles_core::diff::inject_password_changes(changes, &password_changes);
922 }
923 let dropped_roles: Vec<String> = changes
924 .iter()
925 .filter_map(|change| match change {
926 pgroles_core::diff::Change::DropRole { name } => Some(name.clone()),
927 _ => None,
928 })
929 .collect();
930 let drop_safety = pgroles_inspect::inspect_drop_role_safety(pool, &dropped_roles)
931 .await?
932 .assess(&manifest.retirements);
933 if !drop_safety.warnings.is_empty() {
934 tracing::info!(warnings = %drop_safety.warnings, "role-drop cleanup warnings");
935 }
936 if drop_safety.has_blockers() {
937 return Err(ReconcileError::UnsafeRoleDrops(
938 drop_safety.blockers.to_string(),
939 ));
940 }
941
942 let summary = summarize_changes(&changes);
943 let sql_ctx = detect_sql_context(pool, &inspect_config).await?;
944 let (planned_sql, planned_sql_truncated) = render_plan_sql_for_status(&changes, &sql_ctx);
945
946 let effective_approval = resource.spec.effective_approval();
947
948 if resource.spec.mode == PolicyMode::Plan {
949 let drift_detected = !changes.is_empty();
950 let ready_message = if drift_detected {
951 format!("Plan computed; {} change(s) pending", summary.total)
952 } else {
953 "Plan computed; database already matches desired state".to_string()
954 };
955 let drift_reason = if drift_detected {
956 "DriftDetected"
957 } else {
958 "InSync"
959 };
960 let drift_message = if drift_detected {
961 format!("{} planned change(s) pending review", summary.total)
962 } else {
963 "No pending changes".to_string()
964 };
965
966 ctx.observability
967 .record_plan_result(if drift_detected { "drift" } else { "clean" });
968 ctx.observability
969 .record_planned_changes(summary.total.max(0) as usize);
970
971 let mut plan_ref_name = None;
973 if drift_detected {
974 let creation_result = crate::plan::create_or_update_plan(
975 &ctx.kube_client,
976 resource,
977 &changes,
978 &sql_ctx,
979 &inspect_config,
980 resource.spec.reconciliation_mode,
981 identity.as_str(),
982 &summary,
983 )
984 .await?;
985 let plan_name = creation_result.plan_name().to_string();
986
987 if creation_result.is_created() {
989 let plans_api: Api<PostgresPolicyPlan> =
990 Api::namespaced(ctx.kube_client.clone(), namespace);
991 let created_plan = plans_api.get(&plan_name).await?;
992 emit_plan_event(
993 ctx,
994 resource,
995 &created_plan,
996 PlanEventType::Created {
997 change_count: summary.total,
998 },
999 )
1000 .await;
1001 }
1002
1003 crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1004
1005 plan_ref_name = Some(plan_name);
1006 }
1007
1008 update_status(ctx, resource, |status| {
1010 status.set_condition(ready_condition(true, "Planned", &ready_message));
1011 status.set_condition(drifted_condition(
1012 drift_detected,
1013 drift_reason,
1014 &drift_message,
1015 ));
1016 status.conditions.retain(|c| {
1017 c.condition_type != "Reconciling"
1018 && c.condition_type != "Degraded"
1019 && c.condition_type != "Conflict"
1020 && c.condition_type != "Paused"
1021 });
1022 status.observed_generation = generation;
1023 status.last_attempted_generation = generation;
1024 status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1025 status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1026 status.change_summary = Some(summary.clone());
1027 status.last_reconcile_mode = Some(PolicyMode::Plan);
1028 status.planned_sql = planned_sql.clone();
1029 status.planned_sql_truncated = planned_sql_truncated;
1030 status.last_error = None;
1031 status.transient_failure_count = 0;
1032 if let Some(ref plan_name) = plan_ref_name {
1033 status.current_plan_ref = Some(crate::crd::PlanReference {
1034 name: plan_name.clone(),
1035 });
1036 }
1037 })
1038 .await?;
1039
1040 info!(
1041 name,
1042 namespace,
1043 total = summary.total,
1044 drift_detected,
1045 "plan reconciliation complete"
1046 );
1047 return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1048 }
1049
1050 match effective_approval {
1052 crate::crd::ApprovalMode::Auto => {
1053 if !changes.is_empty() {
1056 let creation_result = crate::plan::create_or_update_plan(
1057 &ctx.kube_client,
1058 resource,
1059 &changes,
1060 &sql_ctx,
1061 &inspect_config,
1062 resource.spec.reconciliation_mode,
1063 identity.as_str(),
1064 &summary,
1065 )
1066 .await?;
1067 let plan_name = creation_result.plan_name().to_string();
1068
1069 let plans_api: Api<PostgresPolicyPlan> =
1071 Api::namespaced(ctx.kube_client.clone(), namespace);
1072 let plan = plans_api.get(&plan_name).await?;
1073
1074 if creation_result.is_created() {
1075 emit_plan_event(
1076 ctx,
1077 resource,
1078 &plan,
1079 PlanEventType::Created {
1080 change_count: summary.total,
1081 },
1082 )
1083 .await;
1084 }
1085
1086 crate::plan::mark_plan_approved(
1087 &ctx.kube_client,
1088 &plan,
1089 "AutoApproved",
1090 "Plan auto-approved by policy approval mode",
1091 )
1092 .await?;
1093
1094 let plan = plans_api.get(&plan_name).await?;
1096 emit_plan_event(ctx, resource, &plan, PlanEventType::Approved).await;
1097 emit_plan_event(ctx, resource, &plan, PlanEventType::ApplyStarted).await;
1098
1099 match crate::plan::execute_plan(&ctx.kube_client, &plan, pool, &sql_ctx, &changes)
1100 .await
1101 {
1102 Ok(()) => {
1103 emit_plan_event(ctx, resource, &plan, PlanEventType::ApplySucceeded).await;
1104 }
1105 Err(err) => {
1106 emit_plan_event(
1107 ctx,
1108 resource,
1109 &plan,
1110 PlanEventType::ApplyFailed {
1111 error: err.to_string(),
1112 },
1113 )
1114 .await;
1115 return Err(err);
1116 }
1117 }
1118
1119 ctx.observability.record_apply_result("success");
1120
1121 crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1122
1123 info!(
1124 name,
1125 namespace,
1126 total = summary.total,
1127 plan = %plan_name,
1128 "auto-approved plan applied"
1129 );
1130 } else {
1131 info!(name, namespace, "no changes needed");
1132 }
1133
1134 update_status(ctx, resource, |status| {
1136 status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
1137 status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
1138 status.conditions.retain(|c| {
1139 c.condition_type != "Reconciling"
1140 && c.condition_type != "Degraded"
1141 && c.condition_type != "Conflict"
1142 && c.condition_type != "Paused"
1143 });
1144 status.observed_generation = generation;
1145 status.last_attempted_generation = generation;
1146 status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1147 status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1148 status.change_summary = Some(summary);
1149 status.last_reconcile_mode = Some(PolicyMode::Apply);
1150 status.planned_sql = None;
1151 status.planned_sql_truncated = false;
1152 status.last_error = None;
1153 status.applied_password_source_versions = applied_password_source_versions;
1154 status.transient_failure_count = 0;
1155 })
1156 .await?;
1157
1158 Ok((
1159 Action::requeue(requeue_interval),
1160 ReconcileOutcome::Reconciled,
1161 ))
1162 }
1163 crate::crd::ApprovalMode::Manual => {
1164 if let Some(current_plan) =
1168 crate::plan::get_current_actionable_plan(&ctx.kube_client, resource).await?
1169 {
1170 let approval_state = crate::plan::check_plan_approval(¤t_plan);
1171
1172 match approval_state {
1173 crate::plan::PlanApprovalState::Approved => {
1174 let fresh_sql = crate::plan::render_full_sql(&changes, &sql_ctx);
1177 let fresh_hash = crate::plan::compute_sql_hash(&fresh_sql);
1178 let stored_hash = current_plan
1179 .status
1180 .as_ref()
1181 .and_then(|s| s.sql_hash.as_deref());
1182
1183 if stored_hash != Some(&fresh_hash) {
1184 tracing::warn!(
1186 plan = %current_plan.name_any(),
1187 stored_hash = ?stored_hash,
1188 fresh_hash = %fresh_hash,
1189 "approved plan superseded: database state changed since approval"
1190 );
1191
1192 crate::plan::mark_plan_superseded(&ctx.kube_client, ¤t_plan)
1193 .await?;
1194
1195 let new_creation_result = crate::plan::create_or_update_plan(
1197 &ctx.kube_client,
1198 resource,
1199 &changes,
1200 &sql_ctx,
1201 &inspect_config,
1202 resource.spec.reconciliation_mode,
1203 identity.as_str(),
1204 &summary,
1205 )
1206 .await?;
1207 let new_plan_name = new_creation_result.plan_name().to_string();
1208
1209 if new_creation_result.is_created() {
1210 let plans_api: Api<PostgresPolicyPlan> =
1211 Api::namespaced(ctx.kube_client.clone(), namespace);
1212 let new_plan = plans_api.get(&new_plan_name).await?;
1213 emit_plan_event(
1214 ctx,
1215 resource,
1216 &new_plan,
1217 PlanEventType::Created {
1218 change_count: summary.total,
1219 },
1220 )
1221 .await;
1222 }
1223
1224 crate::plan::update_policy_plan_ref(
1225 &ctx.kube_client,
1226 resource,
1227 &new_plan_name,
1228 )
1229 .await?;
1230
1231 let msg = format!(
1232 "Plan {} superseded (DB state changed); new plan {} created with {} change(s) awaiting approval",
1233 current_plan.name_any(),
1234 new_plan_name,
1235 summary.total,
1236 );
1237 update_status(ctx, resource, |status| {
1238 status.set_condition(ready_condition(true, "Planned", &msg));
1239 status.set_condition(drifted_condition(
1240 true,
1241 "DriftDetected",
1242 &format!("{} planned change(s) pending review", summary.total),
1243 ));
1244 status.conditions.retain(|c| {
1245 c.condition_type != "Reconciling"
1246 && c.condition_type != "Degraded"
1247 && c.condition_type != "Conflict"
1248 && c.condition_type != "Paused"
1249 });
1250 status.last_attempted_generation = generation;
1251 status.change_summary = Some(summary.clone());
1252 status.last_reconcile_mode = Some(PolicyMode::Apply);
1253 status.planned_sql = planned_sql.clone();
1254 status.planned_sql_truncated = planned_sql_truncated;
1255 status.last_error = None;
1256 status.transient_failure_count = 0;
1257 status.current_plan_ref = Some(crate::crd::PlanReference {
1258 name: new_plan_name.clone(),
1259 });
1260 })
1261 .await?;
1262
1263 return Ok((
1264 Action::requeue(requeue_interval),
1265 ReconcileOutcome::Planned,
1266 ));
1267 }
1268
1269 info!(
1271 name,
1272 namespace,
1273 plan = %current_plan.name_any(),
1274 "executing manually approved plan"
1275 );
1276
1277 emit_plan_event(ctx, resource, ¤t_plan, PlanEventType::Approved)
1278 .await;
1279
1280 crate::plan::mark_plan_approved(
1281 &ctx.kube_client,
1282 ¤t_plan,
1283 "ManuallyApproved",
1284 "Plan approved via annotation",
1285 )
1286 .await?;
1287
1288 let plans_api: Api<PostgresPolicyPlan> =
1289 Api::namespaced(ctx.kube_client.clone(), namespace);
1290 let plan = plans_api.get(¤t_plan.name_any()).await?;
1291
1292 emit_plan_event(ctx, resource, &plan, PlanEventType::ApplyStarted).await;
1293
1294 match crate::plan::execute_plan(
1295 &ctx.kube_client,
1296 &plan,
1297 pool,
1298 &sql_ctx,
1299 &changes,
1300 )
1301 .await
1302 {
1303 Ok(()) => {
1304 emit_plan_event(
1305 ctx,
1306 resource,
1307 &plan,
1308 PlanEventType::ApplySucceeded,
1309 )
1310 .await;
1311 }
1312 Err(err) => {
1313 emit_plan_event(
1314 ctx,
1315 resource,
1316 &plan,
1317 PlanEventType::ApplyFailed {
1318 error: err.to_string(),
1319 },
1320 )
1321 .await;
1322 return Err(err);
1323 }
1324 }
1325
1326 ctx.observability.record_apply_result("success");
1327
1328 update_status(ctx, resource, |status| {
1330 status.set_condition(ready_condition(
1331 true,
1332 "Reconciled",
1333 "Approved plan applied",
1334 ));
1335 status.set_condition(drifted_condition(
1336 false,
1337 "InSync",
1338 "No pending changes",
1339 ));
1340 status.conditions.retain(|c| {
1341 c.condition_type != "Reconciling"
1342 && c.condition_type != "Degraded"
1343 && c.condition_type != "Conflict"
1344 && c.condition_type != "Paused"
1345 });
1346 status.observed_generation = generation;
1347 status.last_attempted_generation = generation;
1348 status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1349 status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1350 status.change_summary = Some(summary);
1351 status.last_reconcile_mode = Some(PolicyMode::Apply);
1352 status.planned_sql = None;
1353 status.planned_sql_truncated = false;
1354 status.last_error = None;
1355 status.applied_password_source_versions =
1356 applied_password_source_versions;
1357 status.transient_failure_count = 0;
1358 })
1359 .await?;
1360
1361 return Ok((
1362 Action::requeue(requeue_interval),
1363 ReconcileOutcome::Reconciled,
1364 ));
1365 }
1366 crate::plan::PlanApprovalState::Rejected => {
1367 crate::plan::mark_plan_rejected(&ctx.kube_client, ¤t_plan).await?;
1368 emit_plan_event(ctx, resource, ¤t_plan, PlanEventType::Rejected)
1369 .await;
1370 info!(
1371 name,
1372 namespace,
1373 plan = %current_plan.name_any(),
1374 "plan rejected via annotation"
1375 );
1376
1377 update_status(ctx, resource, |status| {
1380 status.set_condition(ready_condition(
1381 true,
1382 "Planned",
1383 &format!(
1384 "Plan {} rejected; new plan will be created on next reconcile",
1385 current_plan.name_any()
1386 ),
1387 ));
1388 status.last_attempted_generation = generation;
1389 status.last_error = None;
1390 status.transient_failure_count = 0;
1391 status.current_plan_ref = None;
1392 })
1393 .await?;
1394
1395 return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1396 }
1397 crate::plan::PlanApprovalState::Pending => {
1398 info!(
1400 name,
1401 namespace,
1402 plan = %current_plan.name_any(),
1403 "plan awaiting manual approval"
1404 );
1405
1406 update_status(ctx, resource, |status| {
1407 let msg = format!(
1408 "Plan {} awaiting approval; {} change(s) pending",
1409 current_plan.name_any(),
1410 summary.total,
1411 );
1412 status.set_condition(ready_condition(true, "Planned", &msg));
1413 status.set_condition(drifted_condition(
1414 !changes.is_empty(),
1415 if changes.is_empty() {
1416 "InSync"
1417 } else {
1418 "DriftDetected"
1419 },
1420 &msg,
1421 ));
1422 status.conditions.retain(|c| {
1423 c.condition_type != "Reconciling"
1424 && c.condition_type != "Degraded"
1425 && c.condition_type != "Conflict"
1426 && c.condition_type != "Paused"
1427 });
1428 status.last_attempted_generation = generation;
1429 status.change_summary = Some(summary.clone());
1430 status.planned_sql = planned_sql.clone();
1431 status.planned_sql_truncated = planned_sql_truncated;
1432 status.last_error = None;
1433 status.transient_failure_count = 0;
1434 })
1435 .await?;
1436
1437 return Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned));
1438 }
1439 }
1440 }
1441
1442 if changes.is_empty() {
1444 info!(name, namespace, "no changes needed (manual approval mode)");
1445
1446 update_status(ctx, resource, |status| {
1447 status.set_condition(ready_condition(true, "Reconciled", "No changes needed"));
1448 status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
1449 status.conditions.retain(|c| {
1450 c.condition_type != "Reconciling"
1451 && c.condition_type != "Degraded"
1452 && c.condition_type != "Conflict"
1453 && c.condition_type != "Paused"
1454 });
1455 status.observed_generation = generation;
1456 status.last_attempted_generation = generation;
1457 status.last_successful_reconcile_time = Some(crate::crd::now_rfc3339());
1458 status.last_reconcile_time = Some(crate::crd::now_rfc3339());
1459 status.change_summary = Some(summary);
1460 status.last_reconcile_mode = Some(PolicyMode::Apply);
1461 status.planned_sql = None;
1462 status.planned_sql_truncated = false;
1463 status.last_error = None;
1464 status.applied_password_source_versions = applied_password_source_versions;
1465 status.transient_failure_count = 0;
1466 })
1467 .await?;
1468
1469 return Ok((
1470 Action::requeue(requeue_interval),
1471 ReconcileOutcome::Reconciled,
1472 ));
1473 }
1474
1475 let creation_result = crate::plan::create_or_update_plan(
1477 &ctx.kube_client,
1478 resource,
1479 &changes,
1480 &sql_ctx,
1481 &inspect_config,
1482 resource.spec.reconciliation_mode,
1483 identity.as_str(),
1484 &summary,
1485 )
1486 .await?;
1487 let plan_name = creation_result.plan_name().to_string();
1488
1489 if creation_result.is_created() {
1491 let plans_api: Api<PostgresPolicyPlan> =
1492 Api::namespaced(ctx.kube_client.clone(), namespace);
1493 let created_plan = plans_api.get(&plan_name).await?;
1494 emit_plan_event(
1495 ctx,
1496 resource,
1497 &created_plan,
1498 PlanEventType::Created {
1499 change_count: summary.total,
1500 },
1501 )
1502 .await;
1503 }
1504
1505 crate::plan::update_policy_plan_ref(&ctx.kube_client, resource, &plan_name).await?;
1506
1507 let msg = format!(
1508 "Plan {plan_name} created; {} change(s) awaiting approval",
1509 summary.total,
1510 );
1511 update_status(ctx, resource, |status| {
1512 status.set_condition(ready_condition(true, "Planned", &msg));
1513 status.set_condition(drifted_condition(
1514 true,
1515 "DriftDetected",
1516 &format!("{} planned change(s) pending review", summary.total),
1517 ));
1518 status.conditions.retain(|c| {
1519 c.condition_type != "Reconciling"
1520 && c.condition_type != "Degraded"
1521 && c.condition_type != "Conflict"
1522 && c.condition_type != "Paused"
1523 });
1524 status.last_attempted_generation = generation;
1525 status.change_summary = Some(summary.clone());
1526 status.last_reconcile_mode = Some(PolicyMode::Apply);
1527 status.planned_sql = planned_sql.clone();
1528 status.planned_sql_truncated = planned_sql_truncated;
1529 status.last_error = None;
1530 status.transient_failure_count = 0;
1531 status.current_plan_ref = Some(crate::crd::PlanReference {
1532 name: plan_name.clone(),
1533 });
1534 })
1535 .await?;
1536
1537 info!(
1538 name,
1539 namespace,
1540 total = summary.total,
1541 plan = %plan_name,
1542 "plan created, awaiting manual approval"
1543 );
1544
1545 Ok((Action::requeue(requeue_interval), ReconcileOutcome::Planned))
1546 }
1547 }
1548}
1549
1550async fn resolve_passwords_from_secrets(
1562 ctx: &OperatorContext,
1563 resource: &PostgresPolicy,
1564 namespace: &str,
1565) -> Result<std::collections::BTreeMap<String, ResolvedPassword>, ReconcileError> {
1566 use k8s_openapi::api::core::v1::Secret;
1567
1568 let mut resolved = std::collections::BTreeMap::new();
1569
1570 let mut secret_cache: std::collections::BTreeMap<String, Secret> =
1573 std::collections::BTreeMap::new();
1574
1575 let secrets_api: kube::Api<Secret> = kube::Api::namespaced(ctx.kube_client.clone(), namespace);
1576
1577 for role_spec in &resource.spec.roles {
1579 if let Some(pw) = &role_spec.password
1580 && let Some(secret_ref) = &pw.secret_ref
1581 {
1582 let secret_name = &secret_ref.name;
1583 if !secret_cache.contains_key(secret_name.as_str()) {
1584 let fetched = secrets_api.get(secret_name).await.map_err(|err| {
1585 Box::new(crate::context::ContextError::SecretFetch {
1586 name: secret_name.clone(),
1587 namespace: namespace.to_string(),
1588 source: err,
1589 })
1590 })?;
1591 secret_cache.insert(secret_name.clone(), fetched);
1592 }
1593 }
1594 }
1595
1596 for role_spec in &resource.spec.roles {
1598 if let Some(pw) = &role_spec.password {
1599 if let Some(gen_spec) = &pw.generate {
1600 let password = if resource.spec.mode == PolicyMode::Plan {
1601 match crate::password::get_generated_secret(
1602 ctx.kube_client.clone(),
1603 namespace,
1604 &resource.name_any(),
1605 &role_spec.name,
1606 gen_spec,
1607 )
1608 .await
1609 .map_err(Box::new)?
1610 {
1611 Some(existing) => existing,
1612 None => {
1613 let secret_name = crate::password::generated_secret_name(
1614 &resource.name_any(),
1615 &role_spec.name,
1616 gen_spec,
1617 );
1618 let secret_key = crate::password::generated_secret_key(gen_spec);
1619 let cleartext = crate::password::generate_password(
1620 gen_spec
1621 .length
1622 .unwrap_or(crate::password::DEFAULT_PASSWORD_LENGTH),
1623 );
1624
1625 crate::password::GeneratedPasswordSecret {
1626 password: cleartext,
1627 source_version:
1628 crate::password::missing_generated_secret_source_version(
1629 &secret_name,
1630 &secret_key,
1631 ),
1632 }
1633 }
1634 }
1635 } else {
1636 crate::password::ensure_generated_secret(
1638 ctx.kube_client.clone(),
1639 namespace,
1640 resource,
1641 &role_spec.name,
1642 gen_spec,
1643 )
1644 .await
1645 .map_err(Box::new)?
1646 };
1647 resolved.insert(
1648 role_spec.name.clone(),
1649 ResolvedPassword {
1650 cleartext: password.password,
1651 source_version: password.source_version,
1652 },
1653 );
1654 } else if pw.secret_ref.is_some() {
1655 let password = resolve_password_from_cache(&role_spec.name, pw, &secret_cache)?;
1657 resolved.insert(role_spec.name.clone(), password);
1658 }
1659 }
1660 }
1661
1662 Ok(resolved)
1663}
1664
1665fn resolve_password_from_cache(
1667 role_name: &str,
1668 password_spec: &crate::crd::PasswordSpec,
1669 secret_cache: &std::collections::BTreeMap<String, k8s_openapi::api::core::v1::Secret>,
1670) -> Result<ResolvedPassword, ReconcileError> {
1671 let secret_ref = password_spec.secret_ref.as_ref().ok_or_else(|| {
1672 Box::new(crate::context::ContextError::SecretMissing {
1673 name: "(no secretRef)".to_string(),
1674 key: role_name.to_string(),
1675 })
1676 })?;
1677 let secret_name = &secret_ref.name;
1678 let secret_key = password_spec.secret_key.as_deref().unwrap_or(role_name);
1679
1680 let secret = secret_cache.get(secret_name.as_str()).ok_or_else(|| {
1681 Box::new(crate::context::ContextError::SecretMissing {
1682 name: secret_name.clone(),
1683 key: secret_key.to_string(),
1684 })
1685 })?;
1686
1687 let data = secret.data.as_ref().ok_or_else(|| {
1688 Box::new(crate::context::ContextError::SecretMissing {
1689 name: secret_name.clone(),
1690 key: secret_key.to_string(),
1691 })
1692 })?;
1693
1694 let value_bytes = data.get(secret_key).ok_or_else(|| {
1695 Box::new(crate::context::ContextError::SecretMissing {
1696 name: secret_name.clone(),
1697 key: secret_key.to_string(),
1698 })
1699 })?;
1700
1701 let password = String::from_utf8(value_bytes.0.clone()).map_err(|_| {
1702 Box::new(crate::context::ContextError::SecretMissing {
1703 name: secret_name.clone(),
1704 key: secret_key.to_string(),
1705 })
1706 })?;
1707
1708 if password.is_empty() {
1709 return Err(ReconcileError::EmptyPasswordSecret {
1710 role: role_name.to_string(),
1711 secret: secret_name.clone(),
1712 key: secret_key.to_string(),
1713 });
1714 }
1715
1716 let resource_version = secret
1717 .metadata
1718 .resource_version
1719 .as_deref()
1720 .unwrap_or("unknown");
1721 Ok(ResolvedPassword {
1722 cleartext: password,
1723 source_version: format!("{secret_name}:{secret_key}:{resource_version}"),
1724 })
1725}
1726
1727#[cfg(test)]
1729fn resolve_passwords_from_cached_secrets(
1730 resource: &PostgresPolicy,
1731 secret_cache: &std::collections::BTreeMap<String, k8s_openapi::api::core::v1::Secret>,
1732) -> Result<std::collections::BTreeMap<String, ResolvedPassword>, ReconcileError> {
1733 let mut resolved = std::collections::BTreeMap::new();
1734 for role_spec in &resource.spec.roles {
1735 if let Some(pw) = &role_spec.password
1736 && pw.secret_ref.is_some()
1737 {
1738 let password = resolve_password_from_cache(&role_spec.name, pw, secret_cache)?;
1739 resolved.insert(role_spec.name.clone(), password);
1740 }
1741 }
1742 Ok(resolved)
1743}
1744
1745fn select_password_changes(
1746 changes: &[pgroles_core::diff::Change],
1747 resolved_passwords: &std::collections::BTreeMap<String, ResolvedPassword>,
1748 status: Option<&PostgresPolicyStatus>,
1749) -> (
1750 std::collections::BTreeMap<String, String>,
1751 std::collections::BTreeMap<String, String>,
1752) {
1753 let created_roles: std::collections::BTreeSet<&str> = changes
1754 .iter()
1755 .filter_map(|change| match change {
1756 pgroles_core::diff::Change::CreateRole { name, .. } => Some(name.as_str()),
1757 _ => None,
1758 })
1759 .collect();
1760 let previous_versions = status
1761 .map(|status| &status.applied_password_source_versions)
1762 .cloned()
1763 .unwrap_or_default();
1764
1765 let mut password_changes = std::collections::BTreeMap::new();
1766 let mut current_versions = std::collections::BTreeMap::new();
1767
1768 for (role, resolved) in resolved_passwords {
1769 current_versions.insert(role.clone(), resolved.source_version.clone());
1770 if created_roles.contains(role.as_str())
1771 || previous_versions.get(role) != Some(&resolved.source_version)
1772 {
1773 password_changes.insert(role.clone(), resolved.cleartext.clone());
1774 }
1775 }
1776
1777 (password_changes, current_versions)
1778}
1779
1780async fn reconcile_cleanup(
1782 resource: &PostgresPolicy,
1783 ctx: &OperatorContext,
1784) -> Result<Action, ReconcileError> {
1785 let name = resource.name_any();
1786 let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
1787
1788 info!(name, namespace, "cleaning up (resource deleted)");
1789
1790 ctx.evict_pool(&namespace, &resource.spec.connection).await;
1792
1793 Ok(Action::await_change())
1798}
1799
1800fn accumulate_summary(summary: &mut ChangeSummary, change: &pgroles_core::diff::Change) {
1802 use pgroles_core::diff::Change;
1803 match change {
1804 Change::CreateRole { .. } => summary.roles_created += 1,
1805 Change::CreateSchema { .. } => summary.schemas_created += 1,
1806 Change::AlterSchemaOwner { .. } => summary.schema_owners_altered += 1,
1807 Change::AlterRole { .. } => summary.roles_altered += 1,
1808 Change::SetComment { .. } => summary.roles_altered += 1,
1809 Change::DropRole { .. } => summary.roles_dropped += 1,
1810 Change::TerminateSessions { .. } => summary.sessions_terminated += 1,
1811 Change::ReassignOwned { .. } => {}
1812 Change::DropOwned { .. } => {}
1813 Change::Grant { .. } | Change::EnsureSchemaOwnerPrivileges { .. } => {
1814 summary.grants_added += 1
1815 }
1816 Change::Revoke { .. } => summary.grants_revoked += 1,
1817 Change::SetDefaultPrivilege { .. } => summary.default_privileges_set += 1,
1818 Change::RevokeDefaultPrivilege { .. } => summary.default_privileges_revoked += 1,
1819 Change::AddMember { .. } => summary.members_added += 1,
1820 Change::RemoveMember { .. } => summary.members_removed += 1,
1821 Change::SetPassword { .. } => summary.passwords_set += 1,
1822 }
1823}
1824
1825fn summarize_changes(changes: &[pgroles_core::diff::Change]) -> ChangeSummary {
1826 let mut summary = ChangeSummary::default();
1827 for change in changes {
1828 accumulate_summary(&mut summary, change);
1829 }
1830 summary.total = summary.roles_created
1831 + summary.roles_altered
1832 + summary.schemas_created
1833 + summary.schema_owners_altered
1834 + summary.roles_dropped
1835 + summary.sessions_terminated
1836 + summary.grants_added
1837 + summary.grants_revoked
1838 + summary.default_privileges_set
1839 + summary.default_privileges_revoked
1840 + summary.members_added
1841 + summary.members_removed
1842 + summary.passwords_set;
1843 summary
1844}
1845
1846fn parse_rfc3339_to_epoch_secs(timestamp: &str) -> Option<u64> {
1851 if timestamp.len() < 20 || !timestamp.ends_with('Z') {
1853 return None;
1854 }
1855 let year: u64 = timestamp.get(0..4)?.parse().ok()?;
1856 let month: u64 = timestamp.get(5..7)?.parse().ok()?;
1857 let day: u64 = timestamp.get(8..10)?.parse().ok()?;
1858 let hours: u64 = timestamp.get(11..13)?.parse().ok()?;
1859 let minutes: u64 = timestamp.get(14..16)?.parse().ok()?;
1860 let seconds: u64 = timestamp.get(17..19)?.parse().ok()?;
1861
1862 let (y, m) = if month <= 2 {
1864 (year - 1, month + 9)
1865 } else {
1866 (year, month - 3)
1867 };
1868 let era = y / 400;
1869 let yoe = y - era * 400;
1870 let doy = (153 * m + 2) / 5 + day - 1;
1871 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1872 let days_since_epoch = era * 146097 + doe - 719468;
1873
1874 Some(days_since_epoch * 86400 + hours * 3600 + minutes * 60 + seconds)
1875}
1876
1877async fn detect_sql_context(
1878 pool: &sqlx::PgPool,
1879 inspect_config: &pgroles_inspect::InspectConfig,
1880) -> Result<pgroles_core::sql::SqlContext, ReconcileError> {
1881 let pg_version = pgroles_inspect::detect_pg_version(pool).await?;
1882 let privilege_schemas: Vec<&str> = inspect_config
1883 .privilege_schemas
1884 .iter()
1885 .map(|schema| schema.as_str())
1886 .collect();
1887 let relation_inventory =
1888 pgroles_inspect::fetch_relation_inventory(pool, &privilege_schemas).await?;
1889 Ok(
1890 pgroles_core::sql::SqlContext::from_version_num(pg_version.version_num)
1891 .with_relation_inventory(relation_inventory),
1892 )
1893}
1894
1895fn render_plan_sql_for_status(
1896 changes: &[pgroles_core::diff::Change],
1897 sql_ctx: &pgroles_core::sql::SqlContext,
1898) -> (Option<String>, bool) {
1899 if changes.is_empty() {
1900 return (None, false);
1901 }
1902
1903 let rendered: String = changes
1905 .iter()
1906 .flat_map(|change| {
1907 if let pgroles_core::diff::Change::SetPassword { name, .. } = change {
1908 vec![format!(
1909 "ALTER ROLE {} PASSWORD '[REDACTED]';",
1910 pgroles_core::sql::quote_ident(name)
1911 )]
1912 } else {
1913 pgroles_core::sql::render_statements_with_context(change, sql_ctx)
1914 }
1915 })
1916 .collect::<Vec<_>>()
1917 .join("\n");
1918
1919 let (truncated, did_truncate) = truncate_status_text(&rendered, MAX_PLANNED_SQL_STATUS_BYTES);
1920 (Some(truncated), did_truncate)
1921}
1922
1923fn truncate_status_text(text: &str, max_bytes: usize) -> (String, bool) {
1924 if text.len() <= max_bytes {
1925 return (text.to_string(), false);
1926 }
1927
1928 let marker = "\n-- truncated for status --";
1929 let target_len = max_bytes.saturating_sub(marker.len());
1930 let mut end = target_len.min(text.len());
1931 while end > 0 && !text.is_char_boundary(end) {
1932 end -= 1;
1933 }
1934
1935 let mut truncated = text[..end].to_string();
1936 truncated.push_str(marker);
1937 (truncated, true)
1938}
1939
1940async fn emit_plan_event(
1942 ctx: &OperatorContext,
1943 policy: &PostgresPolicy,
1944 plan: &PostgresPolicyPlan,
1945 event_type: PlanEventType,
1946) {
1947 if let Err(error) = publish_plan_event(&ctx.event_recorder, policy, plan, event_type).await {
1948 let namespace = policy.namespace().unwrap_or_default();
1949 let name = policy.name_any();
1950 tracing::warn!(
1951 policy = %format!("{namespace}/{name}"),
1952 %error,
1953 "failed to publish plan lifecycle event"
1954 );
1955 }
1956}
1957
1958async fn update_status<F>(
1960 ctx: &OperatorContext,
1961 resource: &PostgresPolicy,
1962 mutate: F,
1963) -> Result<(), ReconcileError>
1964where
1965 F: FnOnce(&mut PostgresPolicyStatus),
1966{
1967 let namespace = resource.namespace().ok_or(ReconcileError::NoNamespace)?;
1968 let name = resource.name_any();
1969
1970 let api: Api<PostgresPolicy> = Api::namespaced(ctx.kube_client.clone(), &namespace);
1971 let latest = api.get(&name).await?;
1972 let old_status = latest.status.clone();
1973 let mut status = old_status.clone().unwrap_or_default();
1974
1975 mutate(&mut status);
1976
1977 let patch = serde_json::json!({
1978 "status": status
1979 });
1980
1981 api.patch_status(
1982 &name,
1983 &PatchParams::apply("pgroles-operator"),
1984 &Patch::Merge(&patch),
1985 )
1986 .await?;
1987
1988 if let Err(error) =
1989 publish_status_events(&ctx.event_recorder, &latest, old_status.as_ref(), &status).await
1990 {
1991 tracing::warn!(policy = %format!("{namespace}/{name}"), %error, "failed to publish Kubernetes Events");
1992 }
1993
1994 Ok(())
1995}
1996
1997async fn detect_policy_conflict(
1998 ctx: &OperatorContext,
1999 resource: &PostgresPolicy,
2000 identity: &DatabaseIdentity,
2001 ownership: &crate::crd::OwnershipClaims,
2002) -> Result<Option<String>, ReconcileError> {
2003 let api: Api<PostgresPolicy> = Api::all(ctx.kube_client.clone());
2004 let policies = api.list(&Default::default()).await?;
2005
2006 Ok(detect_policy_conflict_in_list(
2007 resource,
2008 identity,
2009 ownership,
2010 policies.into_iter(),
2011 ))
2012}
2013
2014fn detect_policy_conflict_in_list(
2015 resource: &PostgresPolicy,
2016 identity: &DatabaseIdentity,
2017 ownership: &crate::crd::OwnershipClaims,
2018 policies: impl IntoIterator<Item = PostgresPolicy>,
2019) -> Option<String> {
2020 let this_ns = resource.namespace()?;
2021 let this_name = resource.name_any();
2022
2023 let mut conflicts = Vec::new();
2024 for other in policies {
2025 let other_ns = match other.namespace() {
2026 Some(ns) => ns,
2027 None => continue,
2028 };
2029 let other_name = other.name_any();
2030 if other_ns == this_ns && other_name == this_name {
2031 continue;
2032 }
2033
2034 let other_identity = DatabaseIdentity::from_connection(&other_ns, &other.spec.connection);
2035 if &other_identity != identity {
2036 continue;
2037 }
2038
2039 if let Err(error) = other.spec.validate_password_specs(&other_name) {
2040 tracing::warn!(
2041 policy = %format!("{other_ns}/{other_name}"),
2042 database = %identity.as_str(),
2043 %error,
2044 "skipping conflict detection for invalid peer policy"
2045 );
2046 continue;
2047 }
2048
2049 let other_ownership = match other.spec.ownership_claims() {
2050 Ok(claims) => claims,
2051 Err(error) => {
2052 tracing::warn!(
2053 policy = %format!("{other_ns}/{other_name}"),
2054 database = %identity.as_str(),
2055 %error,
2056 "skipping conflict detection for invalid peer policy"
2057 );
2058 continue;
2059 }
2060 };
2061 if ownership.overlaps(&other_ownership) {
2062 let overlap = ownership.overlap_summary(&other_ownership);
2063 conflicts.push(format!("{other_ns}/{other_name} ({overlap})"));
2064 }
2065 }
2066
2067 if conflicts.is_empty() {
2068 None
2069 } else {
2070 Some(format!(
2071 "policy ownership overlaps with {} on database target {}",
2072 conflicts.join(", "),
2073 identity.as_str()
2074 ))
2075 }
2076}
2077
2078impl ReconcileError {
2079 fn reason(&self) -> &'static str {
2080 match self {
2081 ReconcileError::ManifestExpansion(_)
2082 | ReconcileError::InvalidInterval(_, _)
2083 | ReconcileError::InvalidSpec(_) => "InvalidSpec",
2084 ReconcileError::ConflictingPolicy(_) => "ConflictingPolicy",
2085 ReconcileError::UnsatisfiableWildcardGrant(_) => "UnsatisfiableWildcardGrant",
2086 ReconcileError::LockContention(_, _) => "LockContention",
2087 ReconcileError::Context(context) => match context.as_ref() {
2088 ContextError::SecretFetch { .. } => "SecretFetchFailed",
2089 ContextError::SecretMissing { .. } => "SecretMissing",
2090 ContextError::DatabaseConnect { .. } => "DatabaseConnectionFailed",
2091 ContextError::EmptyResolvedValue { .. } => "InvalidConnectionParams",
2092 ContextError::InvalidResolvedSslMode { .. } => "InvalidConnectionParams",
2093 },
2094 ReconcileError::Inspect(error) => match error {
2095 pgroles_inspect::InspectError::Database(sql_err) => {
2096 match classify_sqlx_error(sql_err) {
2097 SqlErrorKind::InsufficientPrivileges => "InsufficientPrivileges",
2098 SqlErrorKind::MissingDatabaseObject => "MissingDatabaseObject",
2099 SqlErrorKind::Transient => "DatabaseInspectionFailed",
2100 }
2101 }
2102 },
2103 ReconcileError::SqlExec(error) => match classify_sqlx_error(error) {
2104 SqlErrorKind::InsufficientPrivileges => "InsufficientPrivileges",
2105 SqlErrorKind::MissingDatabaseObject => "MissingDatabaseObject",
2106 SqlErrorKind::Transient => "ApplyFailed",
2107 },
2108 ReconcileError::UnsafeRoleDrops(_) => "UnsafeRoleDrops",
2109 ReconcileError::EmptyPasswordSecret { .. } => "InvalidSpec",
2110 ReconcileError::MissingDatabaseObjects(_) => "MissingDatabaseObject",
2111 ReconcileError::PasswordGeneration(_) => "SecretFetchFailed",
2112 ReconcileError::PlanSqlStorage(_) => "PlanSqlStorageFailed",
2113 ReconcileError::Kube(_) => "KubernetesApiError",
2114 ReconcileError::NoNamespace => "InvalidResource",
2115 }
2116 }
2117}
2118
2119#[cfg(test)]
2124mod tests {
2125 use super::*;
2126 use crate::crd::{
2127 ConnectionSpec, CrdReconciliationMode, PasswordSpec, PolicyMode, PostgresPolicySpec,
2128 RoleSpec, SecretReference,
2129 };
2130 use k8s_openapi::{
2131 ByteString, api::core::v1::Secret, apimachinery::pkg::apis::meta::v1::ObjectMeta,
2132 };
2133 use sqlx::error::{DatabaseError, ErrorKind};
2134 use std::borrow::Cow;
2135 use std::collections::BTreeMap;
2136 use std::error::Error as StdError;
2137 use std::fmt;
2138
2139 #[derive(Debug)]
2140 struct TestDatabaseError {
2141 message: String,
2142 code: Option<&'static str>,
2143 }
2144
2145 impl fmt::Display for TestDatabaseError {
2146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2147 f.write_str(&self.message)
2148 }
2149 }
2150
2151 impl StdError for TestDatabaseError {}
2152
2153 impl DatabaseError for TestDatabaseError {
2154 fn message(&self) -> &str {
2155 &self.message
2156 }
2157
2158 fn code(&self) -> Option<Cow<'_, str>> {
2159 self.code.map(Cow::Borrowed)
2160 }
2161
2162 fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static) {
2163 self
2164 }
2165
2166 fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
2167 self
2168 }
2169
2170 fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
2171 self
2172 }
2173
2174 fn kind(&self) -> ErrorKind {
2175 ErrorKind::Other
2176 }
2177 }
2178
2179 fn insufficient_privilege_sqlx_error() -> sqlx::Error {
2180 sqlx::Error::Database(Box::new(TestDatabaseError {
2181 message: "permission denied to create role".to_string(),
2182 code: Some(SQLSTATE_INSUFFICIENT_PRIVILEGE),
2183 }))
2184 }
2185
2186 fn missing_schema_sqlx_error() -> sqlx::Error {
2187 sqlx::Error::Database(Box::new(TestDatabaseError {
2188 message: "schema \"etl\" does not exist".to_string(),
2189 code: Some(SQLSTATE_INVALID_SCHEMA_NAME),
2190 }))
2191 }
2192
2193 fn missing_table_sqlx_error() -> sqlx::Error {
2194 sqlx::Error::Database(Box::new(TestDatabaseError {
2195 message: "relation \"foo\" does not exist".to_string(),
2196 code: Some(SQLSTATE_UNDEFINED_TABLE),
2197 }))
2198 }
2199
2200 fn missing_function_sqlx_error() -> sqlx::Error {
2201 sqlx::Error::Database(Box::new(TestDatabaseError {
2202 message: "function foo() does not exist".to_string(),
2203 code: Some(SQLSTATE_UNDEFINED_FUNCTION),
2204 }))
2205 }
2206
2207 fn missing_object_sqlx_error() -> sqlx::Error {
2208 sqlx::Error::Database(Box::new(TestDatabaseError {
2209 message: "role \"nope\" does not exist".to_string(),
2210 code: Some(SQLSTATE_UNDEFINED_OBJECT),
2211 }))
2212 }
2213
2214 fn transient_sqlx_error() -> sqlx::Error {
2215 sqlx::Error::Database(Box::new(TestDatabaseError {
2216 message: "connection timed out".to_string(),
2217 code: Some("08006"),
2218 }))
2219 }
2220
2221 fn test_policy(interval: &str, transient_failure_count: i32) -> Arc<PostgresPolicy> {
2222 let spec = PostgresPolicySpec {
2223 connection: ConnectionSpec {
2224 secret_ref: Some(SecretReference {
2225 name: "db-credentials".to_string(),
2226 }),
2227 secret_key: Some("DATABASE_URL".to_string()),
2228 params: None,
2229 },
2230 interval: interval.to_string(),
2231 suspend: false,
2232 mode: PolicyMode::Apply,
2233 reconciliation_mode: CrdReconciliationMode::default(),
2234 default_owner: None,
2235 profiles: Default::default(),
2236 schemas: Vec::new(),
2237 roles: Vec::new(),
2238 grants: Vec::new(),
2239 default_privileges: Vec::new(),
2240 memberships: Vec::new(),
2241 retirements: Vec::new(),
2242 approval: None,
2243 };
2244 let mut resource = PostgresPolicy::new("example", spec);
2245 resource.metadata.namespace = Some("default".to_string());
2246 resource.status = Some(PostgresPolicyStatus {
2247 transient_failure_count,
2248 ..Default::default()
2249 });
2250 Arc::new(resource)
2251 }
2252
2253 fn test_policy_with_spec(name: &str, spec: PostgresPolicySpec) -> PostgresPolicy {
2254 let mut resource = PostgresPolicy::new(name, spec);
2255 resource.metadata.namespace = Some("default".to_string());
2256 resource
2257 }
2258
2259 fn valid_role_policy(name: &str, role_name: &str, secret_name: &str) -> PostgresPolicy {
2260 test_policy_with_spec(
2261 name,
2262 PostgresPolicySpec {
2263 connection: ConnectionSpec {
2264 secret_ref: Some(SecretReference {
2265 name: secret_name.to_string(),
2266 }),
2267 secret_key: Some("DATABASE_URL".to_string()),
2268 params: None,
2269 },
2270 interval: "5m".to_string(),
2271 suspend: false,
2272 mode: PolicyMode::Apply,
2273 reconciliation_mode: CrdReconciliationMode::default(),
2274 default_owner: None,
2275 profiles: Default::default(),
2276 schemas: Vec::new(),
2277 roles: vec![RoleSpec {
2278 name: role_name.to_string(),
2279 login: Some(true),
2280 superuser: None,
2281 createdb: None,
2282 createrole: None,
2283 inherit: None,
2284 replication: None,
2285 bypassrls: None,
2286 connection_limit: None,
2287 comment: None,
2288 password: None,
2289 password_valid_until: None,
2290 }],
2291 grants: Vec::new(),
2292 default_privileges: Vec::new(),
2293 memberships: Vec::new(),
2294 retirements: Vec::new(),
2295 approval: None,
2296 },
2297 )
2298 }
2299
2300 fn invalid_profile_policy(name: &str, secret_name: &str) -> PostgresPolicy {
2301 test_policy_with_spec(
2302 name,
2303 PostgresPolicySpec {
2304 connection: ConnectionSpec {
2305 secret_ref: Some(SecretReference {
2306 name: secret_name.to_string(),
2307 }),
2308 secret_key: Some("DATABASE_URL".to_string()),
2309 params: None,
2310 },
2311 interval: "5m".to_string(),
2312 suspend: false,
2313 mode: PolicyMode::Apply,
2314 reconciliation_mode: CrdReconciliationMode::default(),
2315 default_owner: None,
2316 profiles: Default::default(),
2317 schemas: vec![pgroles_core::manifest::SchemaBinding {
2318 name: "reporting".to_string(),
2319 profiles: vec!["missing-profile".to_string()],
2320 role_pattern: "{schema}-{profile}".to_string(),
2321 owner: None,
2322 }],
2323 roles: Vec::new(),
2324 grants: Vec::new(),
2325 default_privileges: Vec::new(),
2326 memberships: Vec::new(),
2327 retirements: Vec::new(),
2328 approval: None,
2329 },
2330 )
2331 }
2332
2333 fn password_role_policy() -> PostgresPolicy {
2334 test_policy_with_spec(
2335 "password-policy",
2336 PostgresPolicySpec {
2337 connection: ConnectionSpec {
2338 secret_ref: Some(SecretReference {
2339 name: "db-credentials".to_string(),
2340 }),
2341 secret_key: Some("DATABASE_URL".to_string()),
2342 params: None,
2343 },
2344 interval: "5m".to_string(),
2345 suspend: false,
2346 mode: PolicyMode::Apply,
2347 reconciliation_mode: CrdReconciliationMode::default(),
2348 default_owner: None,
2349 profiles: Default::default(),
2350 schemas: Vec::new(),
2351 roles: vec![
2352 RoleSpec {
2353 name: "app".to_string(),
2354 login: Some(true),
2355 superuser: None,
2356 createdb: None,
2357 createrole: None,
2358 inherit: None,
2359 replication: None,
2360 bypassrls: None,
2361 connection_limit: None,
2362 comment: None,
2363 password: Some(PasswordSpec {
2364 secret_ref: Some(SecretReference {
2365 name: "role-passwords".to_string(),
2366 }),
2367 secret_key: None,
2368 generate: None,
2369 }),
2370 password_valid_until: None,
2371 },
2372 RoleSpec {
2373 name: "reporter".to_string(),
2374 login: Some(true),
2375 superuser: None,
2376 createdb: None,
2377 createrole: None,
2378 inherit: None,
2379 replication: None,
2380 bypassrls: None,
2381 connection_limit: None,
2382 comment: None,
2383 password: Some(PasswordSpec {
2384 secret_ref: Some(SecretReference {
2385 name: "role-passwords".to_string(),
2386 }),
2387 secret_key: Some("reporter-password".to_string()),
2388 generate: None,
2389 }),
2390 password_valid_until: None,
2391 },
2392 ],
2393 grants: Vec::new(),
2394 default_privileges: Vec::new(),
2395 memberships: Vec::new(),
2396 retirements: Vec::new(),
2397 approval: None,
2398 },
2399 )
2400 }
2401
2402 fn secret_with_keys(name: &str, entries: &[(&str, &str)]) -> Secret {
2403 secret_with_keys_and_version(name, "1", entries)
2404 }
2405
2406 fn secret_with_keys_and_version(
2407 name: &str,
2408 resource_version: &str,
2409 entries: &[(&str, &str)],
2410 ) -> Secret {
2411 Secret {
2412 metadata: ObjectMeta {
2413 name: Some(name.to_string()),
2414 resource_version: Some(resource_version.to_string()),
2415 ..Default::default()
2416 },
2417 data: Some(
2418 entries
2419 .iter()
2420 .map(|(key, value)| ((*key).to_string(), ByteString(value.as_bytes().to_vec())))
2421 .collect(),
2422 ),
2423 ..Default::default()
2424 }
2425 }
2426
2427 #[test]
2428 fn parse_interval_minutes() {
2429 let d = parse_interval("5m").unwrap();
2430 assert_eq!(d, Duration::from_secs(300));
2431 }
2432
2433 #[test]
2434 fn parse_interval_hours() {
2435 let d = parse_interval("1h").unwrap();
2436 assert_eq!(d, Duration::from_secs(3600));
2437 }
2438
2439 #[test]
2440 fn parse_interval_seconds() {
2441 let d = parse_interval("30s").unwrap();
2442 assert_eq!(d, Duration::from_secs(30));
2443 }
2444
2445 #[test]
2446 fn parse_interval_compound() {
2447 let d = parse_interval("1h30m").unwrap();
2448 assert_eq!(d, Duration::from_secs(5400));
2449 }
2450
2451 #[test]
2452 fn parse_interval_empty_uses_default() {
2453 let d = parse_interval("").unwrap();
2454 assert_eq!(d, Duration::from_secs(DEFAULT_REQUEUE_SECS));
2455 }
2456
2457 #[test]
2458 fn parse_interval_bare_number_treated_as_seconds() {
2459 let d = parse_interval("120").unwrap();
2460 assert_eq!(d, Duration::from_secs(120));
2461 }
2462
2463 #[test]
2464 fn parse_interval_invalid_unit() {
2465 let result = parse_interval("5x");
2466 assert!(result.is_err());
2467 }
2468
2469 #[test]
2470 fn accumulate_summary_counts() {
2471 use pgroles_core::diff::Change;
2472 use pgroles_core::model::RoleState;
2473
2474 let mut summary = ChangeSummary::default();
2475
2476 accumulate_summary(
2477 &mut summary,
2478 &Change::CreateRole {
2479 name: "test".to_string(),
2480 state: RoleState {
2481 login: true,
2482 ..RoleState::default()
2483 },
2484 },
2485 );
2486 accumulate_summary(
2487 &mut summary,
2488 &Change::Grant {
2489 role: "test".to_string(),
2490 object_type: pgroles_core::manifest::ObjectType::Schema,
2491 schema: None,
2492 name: Some("public".to_string()),
2493 privileges: [pgroles_core::manifest::Privilege::Usage]
2494 .into_iter()
2495 .collect(),
2496 },
2497 );
2498 accumulate_summary(
2499 &mut summary,
2500 &Change::TerminateSessions {
2501 role: "test".to_string(),
2502 },
2503 );
2504
2505 assert_eq!(summary.roles_created, 1);
2506 assert_eq!(summary.grants_added, 1);
2507 assert_eq!(summary.sessions_terminated, 1);
2508 }
2509
2510 #[test]
2511 fn accumulate_summary_counts_schema_changes_separately() {
2512 use pgroles_core::diff::Change;
2513
2514 let mut summary = ChangeSummary::default();
2515
2516 accumulate_summary(
2517 &mut summary,
2518 &Change::CreateSchema {
2519 name: "inventory".to_string(),
2520 owner: Some("inventory_owner".to_string()),
2521 },
2522 );
2523 accumulate_summary(
2524 &mut summary,
2525 &Change::AlterSchemaOwner {
2526 name: "catalog".to_string(),
2527 owner: "catalog_owner".to_string(),
2528 },
2529 );
2530
2531 assert_eq!(summary.schemas_created, 1);
2532 assert_eq!(summary.schema_owners_altered, 1);
2533 assert_eq!(summary.grants_added, 0);
2534 }
2535
2536 #[test]
2537 fn summarize_changes_sets_total() {
2538 use pgroles_core::diff::Change;
2539 use pgroles_core::model::RoleState;
2540
2541 let changes = vec![
2542 Change::CreateRole {
2543 name: "test".to_string(),
2544 state: RoleState::default(),
2545 },
2546 Change::CreateSchema {
2547 name: "inventory".to_string(),
2548 owner: Some("inventory_owner".to_string()),
2549 },
2550 Change::Grant {
2551 role: "test".to_string(),
2552 object_type: pgroles_core::manifest::ObjectType::Schema,
2553 schema: None,
2554 name: Some("public".to_string()),
2555 privileges: [pgroles_core::manifest::Privilege::Usage]
2556 .into_iter()
2557 .collect(),
2558 },
2559 ];
2560
2561 let summary = summarize_changes(&changes);
2562 assert_eq!(summary.roles_created, 1);
2563 assert_eq!(summary.schemas_created, 1);
2564 assert_eq!(summary.grants_added, 1);
2565 assert_eq!(summary.total, 3);
2566 }
2567
2568 #[test]
2569 fn truncate_status_text_marks_truncation() {
2570 let text = "x".repeat(MAX_PLANNED_SQL_STATUS_BYTES + 32);
2571 let (truncated, did_truncate) = truncate_status_text(&text, MAX_PLANNED_SQL_STATUS_BYTES);
2572 assert!(did_truncate);
2573 assert!(truncated.len() <= MAX_PLANNED_SQL_STATUS_BYTES);
2574 assert!(truncated.ends_with("-- truncated for status --"));
2575 }
2576
2577 #[test]
2578 fn accumulate_summary_all_change_types() {
2579 use pgroles_core::diff::Change;
2580 use pgroles_core::model::RoleState;
2581
2582 let mut summary = ChangeSummary::default();
2583
2584 accumulate_summary(
2585 &mut summary,
2586 &Change::CreateRole {
2587 name: "r1".to_string(),
2588 state: RoleState::default(),
2589 },
2590 );
2591 accumulate_summary(
2592 &mut summary,
2593 &Change::AlterRole {
2594 name: "r1".to_string(),
2595 attributes: vec![pgroles_core::model::RoleAttribute::Login(true)],
2596 },
2597 );
2598 accumulate_summary(
2599 &mut summary,
2600 &Change::CreateSchema {
2601 name: "schema1".to_string(),
2602 owner: Some("owner1".to_string()),
2603 },
2604 );
2605 accumulate_summary(
2606 &mut summary,
2607 &Change::AlterSchemaOwner {
2608 name: "schema2".to_string(),
2609 owner: "owner2".to_string(),
2610 },
2611 );
2612 accumulate_summary(
2613 &mut summary,
2614 &Change::SetComment {
2615 name: "r1".to_string(),
2616 comment: Some("comment".to_string()),
2617 },
2618 );
2619 accumulate_summary(
2620 &mut summary,
2621 &Change::DropRole {
2622 name: "r1".to_string(),
2623 },
2624 );
2625 accumulate_summary(
2626 &mut summary,
2627 &Change::TerminateSessions {
2628 role: "r1".to_string(),
2629 },
2630 );
2631 accumulate_summary(
2632 &mut summary,
2633 &Change::ReassignOwned {
2634 from_role: "r1".to_string(),
2635 to_role: "r2".to_string(),
2636 },
2637 );
2638 accumulate_summary(
2639 &mut summary,
2640 &Change::DropOwned {
2641 role: "r1".to_string(),
2642 },
2643 );
2644 accumulate_summary(
2645 &mut summary,
2646 &Change::Grant {
2647 role: "r1".to_string(),
2648 object_type: pgroles_core::manifest::ObjectType::Table,
2649 schema: Some("public".to_string()),
2650 name: Some("*".to_string()),
2651 privileges: [pgroles_core::manifest::Privilege::Select]
2652 .into_iter()
2653 .collect(),
2654 },
2655 );
2656 accumulate_summary(
2657 &mut summary,
2658 &Change::Revoke {
2659 role: "r1".to_string(),
2660 object_type: pgroles_core::manifest::ObjectType::Table,
2661 schema: Some("public".to_string()),
2662 name: Some("*".to_string()),
2663 privileges: [pgroles_core::manifest::Privilege::Select]
2664 .into_iter()
2665 .collect(),
2666 },
2667 );
2668 accumulate_summary(
2669 &mut summary,
2670 &Change::SetDefaultPrivilege {
2671 schema: "public".to_string(),
2672 owner: "owner".to_string(),
2673 grantee: "r1".to_string(),
2674 on_type: pgroles_core::manifest::ObjectType::Table,
2675 privileges: [pgroles_core::manifest::Privilege::Select]
2676 .into_iter()
2677 .collect(),
2678 },
2679 );
2680 accumulate_summary(
2681 &mut summary,
2682 &Change::RevokeDefaultPrivilege {
2683 schema: "public".to_string(),
2684 owner: "owner".to_string(),
2685 grantee: "r1".to_string(),
2686 on_type: pgroles_core::manifest::ObjectType::Table,
2687 privileges: [pgroles_core::manifest::Privilege::Select]
2688 .into_iter()
2689 .collect(),
2690 },
2691 );
2692 accumulate_summary(
2693 &mut summary,
2694 &Change::AddMember {
2695 role: "r1".to_string(),
2696 member: "r2".to_string(),
2697 inherit: true,
2698 admin: false,
2699 },
2700 );
2701 accumulate_summary(
2702 &mut summary,
2703 &Change::RemoveMember {
2704 role: "r1".to_string(),
2705 member: "r2".to_string(),
2706 },
2707 );
2708
2709 assert_eq!(summary.roles_created, 1);
2710 assert_eq!(summary.roles_altered, 2);
2712 assert_eq!(summary.schemas_created, 1);
2713 assert_eq!(summary.schema_owners_altered, 1);
2714 assert_eq!(summary.roles_dropped, 1);
2715 assert_eq!(summary.sessions_terminated, 1);
2716 assert_eq!(summary.grants_added, 1);
2717 assert_eq!(summary.grants_revoked, 1);
2718 assert_eq!(summary.default_privileges_set, 1);
2719 assert_eq!(summary.default_privileges_revoked, 1);
2720 assert_eq!(summary.members_added, 1);
2721 assert_eq!(summary.members_removed, 1);
2722 }
2723
2724 #[test]
2725 fn error_reason_invalid_spec_for_manifest_expansion() {
2726 let err = ReconcileError::ManifestExpansion(
2727 pgroles_core::manifest::ManifestError::UndefinedProfile("bad".into(), "schema1".into()),
2728 );
2729 assert_eq!(err.reason(), "InvalidSpec");
2730 }
2731
2732 #[test]
2733 fn error_reason_invalid_spec_for_invalid_interval() {
2734 let err = ReconcileError::InvalidInterval("5x".into(), "unknown unit 'x'".into());
2735 assert_eq!(err.reason(), "InvalidSpec");
2736 }
2737
2738 #[test]
2739 fn error_reason_invalid_spec_for_password_validation() {
2740 let err = ReconcileError::InvalidSpec("role password must set exactly one mode".into());
2741 assert_eq!(err.reason(), "InvalidSpec");
2742 }
2743
2744 #[test]
2745 fn error_reason_missing_database_objects() {
2746 let err = ReconcileError::MissingDatabaseObjects("schema \"etl\"".into());
2747 assert_eq!(err.reason(), "MissingDatabaseObject");
2748 }
2749
2750 #[test]
2751 fn error_reason_unsatisfiable_wildcard_grant() {
2752 let err = ReconcileError::UnsatisfiableWildcardGrant(
2753 "UnsatisfiableWildcardGrant: function f2() is not grantable".into(),
2754 );
2755 assert_eq!(err.reason(), "UnsatisfiableWildcardGrant");
2756 assert!(err.to_string().contains("UnsatisfiableWildcardGrant"));
2757 }
2758
2759 #[test]
2760 fn unsatisfiable_wildcard_status_is_degraded_without_plan_reference() {
2761 let message = "UnsatisfiableWildcardGrant: cannot fully satisfy wildcard grant EXECUTE ON function * IN SCHEMA \"app\" TO \"reader\" as executor \"app_owner\"; 1 matching object(s) are missing the desired privilege and are not grantable (examples: \"f2()\" owned by \"definer\" missing [EXECUTE])";
2762 let mut status = PostgresPolicyStatus {
2763 conditions: vec![
2764 ready_condition(true, "Planned", "Plan computed"),
2765 conflict_condition("ConflictingPolicy", "Policy overlaps another policy"),
2766 reconciling_condition("Reconciliation in progress"),
2767 drifted_condition(true, "DriftDetected", "1 planned change pending"),
2768 ],
2769 change_summary: Some(ChangeSummary {
2770 grants_added: 1,
2771 total: 1,
2772 ..Default::default()
2773 }),
2774 planned_sql: Some(
2775 "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA \"app\" TO \"reader\";".into(),
2776 ),
2777 planned_sql_truncated: true,
2778 last_error: None,
2779 transient_failure_count: 3,
2780 current_plan_ref: Some(crate::crd::PlanReference {
2781 name: "example-plan".into(),
2782 }),
2783 ..Default::default()
2784 };
2785
2786 mark_reconcile_failure_status(
2787 &mut status,
2788 "UnsatisfiableWildcardGrant",
2789 message,
2790 false,
2791 true,
2792 );
2793
2794 let ready = status
2795 .conditions
2796 .iter()
2797 .find(|condition| condition.condition_type == "Ready")
2798 .expect("Ready condition should be present");
2799 assert_eq!(ready.status, "False");
2800 assert_eq!(ready.reason.as_deref(), Some("UnsatisfiableWildcardGrant"));
2801 assert_eq!(ready.message.as_deref(), Some(message));
2802
2803 let degraded = status
2804 .conditions
2805 .iter()
2806 .find(|condition| condition.condition_type == "Degraded")
2807 .expect("Degraded condition should be present");
2808 assert_eq!(degraded.status, "True");
2809 assert_eq!(
2810 degraded.reason.as_deref(),
2811 Some("UnsatisfiableWildcardGrant")
2812 );
2813 assert_eq!(degraded.message.as_deref(), Some(message));
2814
2815 assert!(
2816 status.conditions.iter().all(|condition| {
2817 condition.condition_type != "Reconciling"
2818 && condition.condition_type != "Drifted"
2819 && condition.condition_type != "Conflict"
2820 }),
2821 "transient planning and stale conflict conditions should be cleared on degraded status"
2822 );
2823 assert!(status.change_summary.is_none());
2824 assert!(status.planned_sql.is_none());
2825 assert!(!status.planned_sql_truncated);
2826 assert!(status.current_plan_ref.is_none());
2827 assert_eq!(status.last_error.as_deref(), Some(message));
2828 assert_eq!(status.transient_failure_count, 0);
2829 }
2830
2831 #[test]
2832 fn reconcile_failure_status_preserves_plan_reference_when_requested() {
2833 let mut status = PostgresPolicyStatus {
2834 current_plan_ref: Some(crate::crd::PlanReference {
2835 name: "approved-plan".into(),
2836 }),
2837 planned_sql: Some("ALTER ROLE \"app\" LOGIN;".into()),
2838 planned_sql_truncated: true,
2839 transient_failure_count: 2,
2840 ..Default::default()
2841 };
2842
2843 mark_reconcile_failure_status(
2844 &mut status,
2845 "ApplyFailed",
2846 "SQL execution error: connection closed",
2847 true,
2848 false,
2849 );
2850
2851 assert_eq!(
2852 status
2853 .current_plan_ref
2854 .as_ref()
2855 .map(|plan| plan.name.as_str()),
2856 Some("approved-plan")
2857 );
2858 assert!(status.planned_sql.is_none());
2859 assert!(!status.planned_sql_truncated);
2860 assert_eq!(
2861 status.last_error.as_deref(),
2862 Some("SQL execution error: connection closed")
2863 );
2864 assert_eq!(status.transient_failure_count, 3);
2865 }
2866
2867 #[test]
2868 fn error_display_missing_database_objects_lists_schemas() {
2869 let err = ReconcileError::MissingDatabaseObjects("schema \"etl\", schema \"jobs\"".into());
2870 let msg = err.to_string();
2871 assert!(msg.contains("schema \"etl\""));
2872 assert!(msg.contains("schema \"jobs\""));
2873 assert!(
2874 msg.contains("pointing at the intended database"),
2875 "message should include remediation hint"
2876 );
2877 }
2878
2879 #[test]
2880 fn referenced_schema_names_from_schema_grants() {
2881 use pgroles_core::manifest::{
2882 ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
2883 };
2884 let expanded = ExpandedManifest {
2885 schemas: Vec::new(),
2886 roles: Vec::new(),
2887 grants: vec![Grant {
2888 role: "app".into(),
2889 privileges: vec![Privilege::Usage],
2890 object: ObjectTarget {
2891 object_type: ObjectType::Schema,
2892 schema: None,
2893 name: Some("etl".into()),
2894 },
2895 }],
2896 default_privileges: Vec::new(),
2897 memberships: Vec::new(),
2898 };
2899 let names = referenced_schema_names(&expanded);
2900 assert!(names.contains("etl"));
2901 }
2902
2903 #[test]
2904 fn referenced_schema_names_from_table_grants() {
2905 use pgroles_core::manifest::{
2906 ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
2907 };
2908 let expanded = ExpandedManifest {
2909 schemas: Vec::new(),
2910 roles: Vec::new(),
2911 grants: vec![Grant {
2912 role: "app".into(),
2913 privileges: vec![Privilege::Select],
2914 object: ObjectTarget {
2915 object_type: ObjectType::Table,
2916 schema: Some("analytics".into()),
2917 name: Some("*".into()),
2918 },
2919 }],
2920 default_privileges: Vec::new(),
2921 memberships: Vec::new(),
2922 };
2923 let names = referenced_schema_names(&expanded);
2924 assert!(names.contains("analytics"));
2925 }
2926
2927 #[test]
2928 fn referenced_schema_names_from_default_privileges() {
2929 use pgroles_core::manifest::{
2930 DefaultPrivilege, DefaultPrivilegeGrant, ExpandedManifest, ObjectType, Privilege,
2931 };
2932 let expanded = ExpandedManifest {
2933 schemas: Vec::new(),
2934 roles: Vec::new(),
2935 grants: Vec::new(),
2936 default_privileges: vec![DefaultPrivilege {
2937 owner: Some("app_owner".into()),
2938 schema: "reporting".into(),
2939 grant: vec![DefaultPrivilegeGrant {
2940 role: Some("app".into()),
2941 privileges: vec![Privilege::Select],
2942 on_type: ObjectType::Table,
2943 }],
2944 }],
2945 memberships: Vec::new(),
2946 };
2947 let names = referenced_schema_names(&expanded);
2948 assert!(names.contains("reporting"));
2949 }
2950
2951 #[test]
2952 fn referenced_schema_names_deduplicates_across_sources() {
2953 use pgroles_core::manifest::{
2954 DefaultPrivilege, DefaultPrivilegeGrant, ExpandedManifest, Grant, ObjectTarget,
2955 ObjectType, Privilege,
2956 };
2957 let expanded = ExpandedManifest {
2958 schemas: Vec::new(),
2959 roles: Vec::new(),
2960 grants: vec![
2961 Grant {
2962 role: "app".into(),
2963 privileges: vec![Privilege::Usage],
2964 object: ObjectTarget {
2965 object_type: ObjectType::Schema,
2966 schema: None,
2967 name: Some("shared".into()),
2968 },
2969 },
2970 Grant {
2971 role: "app".into(),
2972 privileges: vec![Privilege::Select],
2973 object: ObjectTarget {
2974 object_type: ObjectType::Table,
2975 schema: Some("shared".into()),
2976 name: Some("*".into()),
2977 },
2978 },
2979 ],
2980 default_privileges: vec![DefaultPrivilege {
2981 owner: Some("app_owner".into()),
2982 schema: "shared".into(),
2983 grant: vec![DefaultPrivilegeGrant {
2984 role: Some("app".into()),
2985 privileges: vec![Privilege::Select],
2986 on_type: ObjectType::Table,
2987 }],
2988 }],
2989 memberships: Vec::new(),
2990 };
2991 let names = referenced_schema_names(&expanded);
2992 assert_eq!(names.len(), 1);
2994 assert!(names.contains("shared"));
2995 }
2996
2997 #[test]
2998 fn referenced_schema_names_skips_database_and_roleless_grants() {
2999 use pgroles_core::manifest::{
3000 ExpandedManifest, Grant, ObjectTarget, ObjectType, Privilege,
3001 };
3002 let expanded = ExpandedManifest {
3003 schemas: Vec::new(),
3004 roles: Vec::new(),
3005 grants: vec![Grant {
3006 role: "app".into(),
3007 privileges: vec![Privilege::Connect],
3008 object: ObjectTarget {
3009 object_type: ObjectType::Database,
3010 schema: None,
3011 name: Some("mydb".into()),
3012 },
3013 }],
3014 default_privileges: Vec::new(),
3015 memberships: Vec::new(),
3016 };
3017 let names = referenced_schema_names(&expanded);
3018 assert!(
3019 names.is_empty(),
3020 "database-level grants should not contribute schema names"
3021 );
3022 }
3023
3024 #[test]
3025 fn is_system_schema_identifies_pg_and_information_schema() {
3026 assert!(is_system_schema("pg_catalog"));
3027 assert!(is_system_schema("pg_toast"));
3028 assert!(is_system_schema("pg_temp_1"));
3029 assert!(is_system_schema("information_schema"));
3030 assert!(!is_system_schema("public"));
3031 assert!(!is_system_schema("etl"));
3032 assert!(!is_system_schema("analytics"));
3033 }
3034
3035 #[test]
3036 fn referenced_schema_names_include_declared_schemas() {
3037 use pgroles_core::manifest::{ExpandedManifest, ExpandedSchema};
3038
3039 let expanded = ExpandedManifest {
3040 schemas: vec![ExpandedSchema {
3041 name: "cdc".into(),
3042 owner: Some("cdc_owner".into()),
3043 }],
3044 roles: Vec::new(),
3045 grants: Vec::new(),
3046 default_privileges: Vec::new(),
3047 memberships: Vec::new(),
3048 };
3049
3050 let names = referenced_schema_names(&expanded);
3051 assert!(names.contains("cdc"));
3052 }
3053
3054 #[test]
3055 fn declared_schema_names_returns_declared_only() {
3056 use pgroles_core::manifest::{ExpandedManifest, ExpandedSchema};
3057
3058 let expanded = ExpandedManifest {
3059 schemas: vec![ExpandedSchema {
3060 name: "cdc".into(),
3061 owner: Some("cdc_owner".into()),
3062 }],
3063 roles: Vec::new(),
3064 grants: Vec::new(),
3065 default_privileges: Vec::new(),
3066 memberships: Vec::new(),
3067 };
3068
3069 let names = declared_schema_names(&expanded);
3070 assert_eq!(names.len(), 1);
3071 assert!(names.contains("cdc"));
3072 }
3073
3074 #[test]
3075 fn externally_required_schema_names_excludes_declared_schemas() {
3076 use pgroles_core::manifest::{
3077 ExpandedManifest, ExpandedSchema, Grant, ObjectTarget, ObjectType, Privilege,
3078 };
3079
3080 let expanded = ExpandedManifest {
3081 schemas: vec![ExpandedSchema {
3082 name: "managed".into(),
3083 owner: Some("managed_owner".into()),
3084 }],
3085 roles: Vec::new(),
3086 grants: vec![
3087 Grant {
3088 role: "app".into(),
3089 privileges: vec![Privilege::Usage],
3090 object: ObjectTarget {
3091 object_type: ObjectType::Schema,
3092 schema: None,
3093 name: Some("managed".into()),
3094 },
3095 },
3096 Grant {
3097 role: "app".into(),
3098 privileges: vec![Privilege::Select],
3099 object: ObjectTarget {
3100 object_type: ObjectType::Table,
3101 schema: Some("external".into()),
3102 name: Some("*".into()),
3103 },
3104 },
3105 ],
3106 default_privileges: Vec::new(),
3107 memberships: Vec::new(),
3108 };
3109
3110 let names = externally_required_schema_names(&expanded);
3111 assert_eq!(names.len(), 1);
3112 assert!(names.contains("external"));
3113 assert!(!names.contains("managed"));
3114 }
3115
3116 #[test]
3117 fn error_reason_conflicting_policy() {
3118 let err = ReconcileError::ConflictingPolicy("overlaps with other".into());
3119 assert_eq!(err.reason(), "ConflictingPolicy");
3120 }
3121
3122 #[test]
3123 fn error_reason_unsafe_role_drops() {
3124 let err = ReconcileError::UnsafeRoleDrops("role owns objects".into());
3125 assert_eq!(err.reason(), "UnsafeRoleDrops");
3126 }
3127
3128 #[test]
3129 fn error_reason_no_namespace() {
3130 let err = ReconcileError::NoNamespace;
3131 assert_eq!(err.reason(), "InvalidResource");
3132 }
3133
3134 #[test]
3135 fn error_reason_context_secret_missing() {
3136 let err = ReconcileError::Context(Box::new(crate::context::ContextError::SecretMissing {
3137 name: "pg-secret".into(),
3138 key: "DATABASE_URL".into(),
3139 }));
3140 assert_eq!(err.reason(), "SecretMissing");
3141 }
3142
3143 #[test]
3144 fn error_reason_sql_exec_insufficient_privileges() {
3145 let err = ReconcileError::SqlExec(insufficient_privilege_sqlx_error());
3146 assert_eq!(err.reason(), "InsufficientPrivileges");
3147 }
3148
3149 #[test]
3150 fn error_reason_inspect_insufficient_privileges() {
3151 let err = ReconcileError::Inspect(pgroles_inspect::InspectError::Database(
3152 insufficient_privilege_sqlx_error(),
3153 ));
3154 assert_eq!(err.reason(), "InsufficientPrivileges");
3155 }
3156
3157 #[test]
3158 fn error_display_includes_details() {
3159 let err = ReconcileError::InvalidInterval("5x".into(), "unknown unit 'x'".into());
3160 let msg = err.to_string();
3161 assert!(msg.contains("5x"), "error display should contain interval");
3162 assert!(
3163 msg.contains("unknown unit"),
3164 "error display should contain reason"
3165 );
3166 }
3167
3168 #[test]
3169 fn error_reason_lock_contention() {
3170 let err = ReconcileError::LockContention(
3171 "prod/db-creds/DATABASE_URL".into(),
3172 "in-process lock held".into(),
3173 );
3174 assert_eq!(err.reason(), "LockContention");
3175 }
3176
3177 #[test]
3178 fn error_display_lock_contention_includes_database() {
3179 let err = ReconcileError::LockContention(
3180 "prod/db-creds/DATABASE_URL".into(),
3181 "advisory lock held by another session".into(),
3182 );
3183 let msg = err.to_string();
3184 assert!(
3185 msg.contains("prod/db-creds/DATABASE_URL"),
3186 "lock contention error should include database identity"
3187 );
3188 assert!(
3189 msg.contains("advisory lock"),
3190 "lock contention error should include reason"
3191 );
3192 }
3193
3194 #[test]
3195 fn requeue_with_jitter_produces_bounded_delay() {
3196 let base = LOCK_CONTENTION_BASE_SECS;
3198 let max = LOCK_CONTENTION_BASE_SECS + LOCK_CONTENTION_JITTER_SECS;
3199 for _ in 0..20 {
3200 let delay = jitter_delay();
3201 let secs = delay.as_secs();
3202 assert!(
3203 secs >= base,
3204 "jitter delay {secs}s should be at least base {base}s",
3205 );
3206 assert!(
3207 secs <= max,
3208 "jitter delay {secs}s should not exceed base+jitter {max}s",
3209 );
3210 }
3211 }
3212
3213 #[test]
3214 fn lock_contention_constants_are_reasonable() {
3215 let base = LOCK_CONTENTION_BASE_SECS;
3217 let jitter = LOCK_CONTENTION_JITTER_SECS;
3218 assert!(base > 0, "base delay must be positive");
3219 assert!(jitter > 0, "jitter window must be positive");
3220 assert!(
3221 base + jitter <= 60,
3222 "total max contention delay should not exceed error_policy's 60s"
3223 );
3224 }
3225
3226 #[test]
3227 fn transient_backoff_delay_is_bounded_and_caps() {
3228 for _ in 0..20 {
3229 let first = transient_backoff_delay(1).as_secs();
3230 assert!((TRANSIENT_BACKOFF_BASE_SECS..=7).contains(&first));
3231
3232 let fourth = transient_backoff_delay(4).as_secs();
3233 assert!((40..=60).contains(&fourth));
3234
3235 let capped = transient_backoff_delay(10).as_secs();
3236 assert_eq!(capped, TRANSIENT_BACKOFF_MAX_SECS);
3237 }
3238 }
3239
3240 #[test]
3241 fn slow_retry_delay_uses_policy_interval() {
3242 let resource = test_policy("7m", 0);
3243 assert_eq!(slow_retry_delay(&resource), Duration::from_secs(420));
3244 }
3245
3246 #[test]
3247 fn slow_retry_delay_falls_back_on_invalid_interval() {
3248 let resource = test_policy("nope", 0);
3249 assert_eq!(
3250 slow_retry_delay(&resource),
3251 Duration::from_secs(DEFAULT_REQUEUE_SECS)
3252 );
3253 }
3254
3255 #[test]
3256 fn retry_classifies_lock_contention_separately() {
3257 let error = finalizer::Error::ApplyFailed(ReconcileError::LockContention(
3258 "default/db-credentials/DATABASE_URL".into(),
3259 "lock held".into(),
3260 ));
3261 assert_eq!(retry_class(&error), RetryClass::LockContention);
3262 }
3263
3264 #[test]
3265 fn retry_classifies_invalid_spec_as_slow() {
3266 let error = finalizer::Error::ApplyFailed(ReconcileError::InvalidInterval(
3267 "oops".into(),
3268 "bad interval".into(),
3269 ));
3270 assert_eq!(retry_class(&error), RetryClass::Slow);
3271 }
3272
3273 #[test]
3274 fn retry_classifies_missing_database_objects_as_slow() {
3275 let error = finalizer::Error::ApplyFailed(ReconcileError::MissingDatabaseObjects(
3276 "schema \"etl\"".into(),
3277 ));
3278 assert_eq!(retry_class(&error), RetryClass::Slow);
3279 }
3280
3281 #[test]
3282 fn retry_classifies_unsatisfiable_wildcard_grant_as_slow() {
3283 let error = finalizer::Error::ApplyFailed(ReconcileError::UnsatisfiableWildcardGrant(
3284 "UnsatisfiableWildcardGrant: function f2() is not grantable".into(),
3285 ));
3286 assert_eq!(retry_class(&error), RetryClass::Slow);
3287 }
3288
3289 #[test]
3290 fn retry_classifies_plan_sql_storage_as_slow() {
3291 let error =
3292 finalizer::Error::ApplyFailed(ReconcileError::PlanSqlStorage("gzip failed".into()));
3293 assert_eq!(retry_class(&error), RetryClass::Slow);
3294 }
3295
3296 #[test]
3297 fn retry_classifies_secret_missing_as_slow() {
3298 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3299 crate::context::ContextError::SecretMissing {
3300 name: "db-credentials".into(),
3301 key: "DATABASE_URL".into(),
3302 },
3303 )));
3304 assert_eq!(retry_class(&error), RetryClass::Slow);
3305 }
3306
3307 #[test]
3308 fn retry_classifies_secret_fetch_not_found_as_slow() {
3309 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3310 crate::context::ContextError::SecretFetch {
3311 name: "db-credentials".into(),
3312 namespace: "default".into(),
3313 source: kube::Error::Api(
3314 kube::core::Status::failure("secrets \"db-credentials\" not found", "NotFound")
3315 .with_code(404)
3316 .boxed(),
3317 ),
3318 },
3319 )));
3320 assert_eq!(retry_class(&error), RetryClass::Slow);
3321 }
3322
3323 #[test]
3324 fn retry_classifies_secret_fetch_transport_errors_as_transient() {
3325 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3326 crate::context::ContextError::SecretFetch {
3327 name: "db-credentials".into(),
3328 namespace: "default".into(),
3329 source: kube::Error::Api(
3330 kube::core::Status::failure("internal error", "InternalError")
3331 .with_code(500)
3332 .boxed(),
3333 ),
3334 },
3335 )));
3336 assert_eq!(retry_class(&error), RetryClass::Transient);
3337 }
3338
3339 #[test]
3340 fn retry_classifies_secret_fetch_forbidden_as_slow() {
3341 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3342 crate::context::ContextError::SecretFetch {
3343 name: "db-credentials".into(),
3344 namespace: "default".into(),
3345 source: kube::Error::Api(
3346 kube::core::Status::failure("forbidden", "Forbidden")
3347 .with_code(403)
3348 .boxed(),
3349 ),
3350 },
3351 )));
3352 assert_eq!(retry_class(&error), RetryClass::Slow);
3353 }
3354
3355 #[test]
3356 fn retry_classifies_database_connect_as_transient() {
3357 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3358 crate::context::ContextError::DatabaseConnect {
3359 source: sqlx::Error::PoolTimedOut,
3360 },
3361 )));
3362 assert_eq!(retry_class(&error), RetryClass::Transient);
3363 }
3364
3365 #[test]
3366 fn retry_classifies_sql_exec_insufficient_privilege_as_slow() {
3367 let error = finalizer::Error::ApplyFailed(ReconcileError::SqlExec(
3368 insufficient_privilege_sqlx_error(),
3369 ));
3370 assert_eq!(retry_class(&error), RetryClass::Slow);
3371 }
3372
3373 #[test]
3374 fn retry_classifies_inspect_insufficient_privilege_as_slow() {
3375 let error = finalizer::Error::ApplyFailed(ReconcileError::Inspect(
3376 pgroles_inspect::InspectError::Database(insufficient_privilege_sqlx_error()),
3377 ));
3378 assert_eq!(retry_class(&error), RetryClass::Slow);
3379 }
3380
3381 #[test]
3382 fn classify_sqlx_error_categories() {
3383 assert_eq!(
3384 classify_sqlx_error(&insufficient_privilege_sqlx_error()),
3385 SqlErrorKind::InsufficientPrivileges
3386 );
3387 assert_eq!(
3388 classify_sqlx_error(&missing_schema_sqlx_error()),
3389 SqlErrorKind::MissingDatabaseObject
3390 );
3391 assert_eq!(
3392 classify_sqlx_error(&missing_table_sqlx_error()),
3393 SqlErrorKind::MissingDatabaseObject
3394 );
3395 assert_eq!(
3396 classify_sqlx_error(&missing_function_sqlx_error()),
3397 SqlErrorKind::MissingDatabaseObject
3398 );
3399 assert_eq!(
3400 classify_sqlx_error(&missing_object_sqlx_error()),
3401 SqlErrorKind::MissingDatabaseObject
3402 );
3403 assert_eq!(
3404 classify_sqlx_error(&transient_sqlx_error()),
3405 SqlErrorKind::Transient
3406 );
3407 }
3408
3409 #[test]
3410 fn retry_classifies_sql_exec_missing_schema_as_slow() {
3411 let error =
3412 finalizer::Error::ApplyFailed(ReconcileError::SqlExec(missing_schema_sqlx_error()));
3413 assert_eq!(retry_class(&error), RetryClass::Slow);
3414 }
3415
3416 #[test]
3417 fn retry_classifies_sql_exec_missing_table_as_slow() {
3418 let error =
3419 finalizer::Error::ApplyFailed(ReconcileError::SqlExec(missing_table_sqlx_error()));
3420 assert_eq!(retry_class(&error), RetryClass::Slow);
3421 }
3422
3423 #[test]
3424 fn retry_classifies_inspect_missing_schema_as_slow() {
3425 let error = finalizer::Error::ApplyFailed(ReconcileError::Inspect(
3426 pgroles_inspect::InspectError::Database(missing_schema_sqlx_error()),
3427 ));
3428 assert_eq!(retry_class(&error), RetryClass::Slow);
3429 }
3430
3431 #[test]
3432 fn error_reason_sql_exec_missing_database_object() {
3433 let err = ReconcileError::SqlExec(missing_schema_sqlx_error());
3434 assert_eq!(err.reason(), "MissingDatabaseObject");
3435 }
3436
3437 #[test]
3438 fn error_reason_inspect_missing_database_object() {
3439 let err = ReconcileError::Inspect(pgroles_inspect::InspectError::Database(
3440 missing_table_sqlx_error(),
3441 ));
3442 assert_eq!(err.reason(), "MissingDatabaseObject");
3443 }
3444
3445 #[test]
3446 fn retry_classifies_empty_resolved_value_as_slow() {
3447 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3448 crate::context::ContextError::EmptyResolvedValue {
3449 field: "password".to_string(),
3450 },
3451 )));
3452 assert_eq!(retry_class(&error), RetryClass::Slow);
3453 }
3454
3455 #[test]
3456 fn error_reason_empty_resolved_value() {
3457 let err =
3458 ReconcileError::Context(Box::new(crate::context::ContextError::EmptyResolvedValue {
3459 field: "host".to_string(),
3460 }));
3461 assert_eq!(err.reason(), "InvalidConnectionParams");
3462 }
3463
3464 #[test]
3465 fn retry_classifies_invalid_resolved_ssl_mode_as_slow() {
3466 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3467 crate::context::ContextError::InvalidResolvedSslMode {
3468 value: "bogus".to_string(),
3469 },
3470 )));
3471 assert_eq!(retry_class(&error), RetryClass::Slow);
3472 }
3473
3474 #[test]
3475 fn error_reason_invalid_resolved_ssl_mode() {
3476 let err = ReconcileError::Context(Box::new(
3477 crate::context::ContextError::InvalidResolvedSslMode {
3478 value: "bogus".to_string(),
3479 },
3480 ));
3481 assert_eq!(err.reason(), "InvalidConnectionParams");
3482 }
3483
3484 #[test]
3485 fn error_reason_sql_exec_transient_is_apply_failed() {
3486 let err = ReconcileError::SqlExec(transient_sqlx_error());
3487 assert_eq!(err.reason(), "ApplyFailed");
3488 }
3489
3490 #[test]
3491 fn error_reason_plan_sql_storage_failed() {
3492 let err = ReconcileError::PlanSqlStorage("gzip failed".into());
3493 assert_eq!(err.reason(), "PlanSqlStorageFailed");
3494 }
3495
3496 #[test]
3497 fn error_policy_uses_normal_interval_for_invalid_spec() {
3498 let resource = test_policy("11m", 0);
3499 let error = finalizer::Error::ApplyFailed(ReconcileError::InvalidInterval(
3500 "oops".into(),
3501 "bad interval".into(),
3502 ));
3503 assert_eq!(
3504 retry_action(&resource, &error),
3505 Action::requeue(Duration::from_secs(660))
3506 );
3507 }
3508
3509 #[test]
3510 fn error_policy_uses_exponential_backoff_for_transient_failures() {
3511 let resource = test_policy("5m", 3);
3512 let error = finalizer::Error::ApplyFailed(ReconcileError::Context(Box::new(
3513 crate::context::ContextError::DatabaseConnect {
3514 source: sqlx::Error::PoolTimedOut,
3515 },
3516 )));
3517 let action = retry_action(&resource, &error);
3518 assert!(
3519 (40..=60).any(|secs| action == Action::requeue(Duration::from_secs(secs))),
3520 "expected transient retry between 40s and 60s, got {action:?}"
3521 );
3522 }
3523
3524 #[test]
3525 fn render_plan_sql_for_status_redacts_passwords() {
3526 let changes = vec![
3527 pgroles_core::diff::Change::CreateRole {
3528 name: "app-svc".to_string(),
3529 state: pgroles_core::model::RoleState {
3530 login: true,
3531 ..pgroles_core::model::RoleState::default()
3532 },
3533 },
3534 pgroles_core::diff::Change::SetPassword {
3535 name: "app-svc".to_string(),
3536 password: "super_secret_p@ssw0rd!".to_string(),
3537 },
3538 ];
3539
3540 let sql_ctx = pgroles_core::sql::SqlContext::default();
3541 let (sql, truncated) = render_plan_sql_for_status(&changes, &sql_ctx);
3542
3543 let sql = sql.expect("expected non-empty planned SQL");
3544 assert!(!truncated);
3545 assert!(
3546 sql.contains("[REDACTED]"),
3547 "status SQL should contain [REDACTED], got: {sql}"
3548 );
3549 assert!(
3550 !sql.contains("super_secret_p@ssw0rd!"),
3551 "status SQL must NOT contain the actual password, got: {sql}"
3552 );
3553 assert!(
3554 sql.contains("CREATE ROLE"),
3555 "status SQL should still contain non-password changes, got: {sql}"
3556 );
3557 }
3558
3559 #[test]
3560 fn render_plan_sql_for_status_empty_changes_returns_none() {
3561 let sql_ctx = pgroles_core::sql::SqlContext::default();
3562 let (sql, truncated) = render_plan_sql_for_status(&[], &sql_ctx);
3563 assert!(sql.is_none());
3564 assert!(!truncated);
3565 }
3566
3567 #[test]
3568 fn render_plan_sql_for_status_password_only_plan() {
3569 let changes = vec![pgroles_core::diff::Change::SetPassword {
3570 name: "db-user".to_string(),
3571 password: "my_secret_pw".to_string(),
3572 }];
3573
3574 let sql_ctx = pgroles_core::sql::SqlContext::default();
3575 let (sql, _) = render_plan_sql_for_status(&changes, &sql_ctx);
3576
3577 let sql = sql.expect("expected non-empty planned SQL");
3578 assert!(
3579 sql.contains("[REDACTED]"),
3580 "password-only plan should still show redacted SQL"
3581 );
3582 assert!(
3583 !sql.contains("my_secret_pw"),
3584 "password-only plan must NOT leak the password"
3585 );
3586 }
3587
3588 #[test]
3589 fn error_reason_empty_password_secret() {
3590 let err = ReconcileError::EmptyPasswordSecret {
3591 role: "app-svc".to_string(),
3592 secret: "pg-passwords".to_string(),
3593 key: "app-svc".to_string(),
3594 };
3595 assert_eq!(err.reason(), "InvalidSpec");
3596 }
3597
3598 #[test]
3599 fn retry_classifies_empty_password_secret_as_slow() {
3600 let error = finalizer::Error::ApplyFailed(ReconcileError::EmptyPasswordSecret {
3601 role: "app-svc".to_string(),
3602 secret: "pg-passwords".to_string(),
3603 key: "app-svc".to_string(),
3604 });
3605 assert_eq!(retry_class(&error), RetryClass::Slow);
3606 }
3607
3608 #[test]
3609 fn error_reason_password_generation() {
3610 let err = ReconcileError::PasswordGeneration(Box::new(
3611 crate::password::PasswordError::MissingKey {
3612 secret: "my-secret".to_string(),
3613 key: "password".to_string(),
3614 },
3615 ));
3616 assert_eq!(err.reason(), "SecretFetchFailed");
3617 }
3618
3619 #[test]
3620 fn retry_classifies_password_generation_missing_key_as_slow() {
3621 let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3622 crate::password::PasswordError::MissingKey {
3623 secret: "my-secret".to_string(),
3624 key: "password".to_string(),
3625 },
3626 )));
3627 assert_eq!(retry_class(&error), RetryClass::Slow);
3628 }
3629
3630 #[test]
3631 fn retry_classifies_password_generation_kube_server_error_as_transient() {
3632 let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3633 crate::password::PasswordError::KubeApi {
3634 secret: "my-secret".to_string(),
3635 source: Box::new(kube::Error::Api(
3636 kube::core::Status::failure("internal error", "InternalError")
3637 .with_code(500)
3638 .boxed(),
3639 )),
3640 },
3641 )));
3642 assert_eq!(retry_class(&error), RetryClass::Transient);
3643 }
3644
3645 #[test]
3646 fn retry_classifies_password_generation_kube_forbidden_as_slow() {
3647 let error = finalizer::Error::ApplyFailed(ReconcileError::PasswordGeneration(Box::new(
3648 crate::password::PasswordError::KubeApi {
3649 secret: "my-secret".to_string(),
3650 source: Box::new(kube::Error::Api(
3651 kube::core::Status::failure("forbidden", "Forbidden")
3652 .with_code(403)
3653 .boxed(),
3654 )),
3655 },
3656 )));
3657 assert_eq!(retry_class(&error), RetryClass::Slow);
3658 }
3659
3660 #[test]
3661 fn accumulate_summary_counts_passwords() {
3662 use pgroles_core::diff::Change;
3663
3664 let mut summary = ChangeSummary::default();
3665 accumulate_summary(
3666 &mut summary,
3667 &Change::SetPassword {
3668 name: "app-svc".to_string(),
3669 password: "secret".to_string(),
3670 },
3671 );
3672 assert_eq!(summary.passwords_set, 1);
3673 }
3674
3675 #[test]
3676 fn conflict_detection_ignores_invalid_peer_policies() {
3677 let resource = valid_role_policy("valid-policy", "analytics", "shared-db-secret");
3678 let identity = DatabaseIdentity::from_connection("default", &resource.spec.connection);
3679 let ownership = resource.spec.ownership_claims().unwrap();
3680 let invalid_peer = invalid_profile_policy("invalid-peer", "shared-db-secret");
3681
3682 let conflict =
3683 detect_policy_conflict_in_list(&resource, &identity, &ownership, vec![invalid_peer]);
3684
3685 assert_eq!(conflict, None);
3686 }
3687
3688 #[test]
3689 fn resolve_passwords_from_cached_secrets_supports_default_and_explicit_keys() {
3690 let resource = password_role_policy();
3691 let cache = BTreeMap::from([(
3692 "role-passwords".to_string(),
3693 secret_with_keys(
3694 "role-passwords",
3695 &[
3696 ("app", "app-secret"),
3697 ("reporter-password", "reporter-secret"),
3698 ],
3699 ),
3700 )]);
3701
3702 let resolved =
3703 resolve_passwords_from_cached_secrets(&resource, &cache).expect("should resolve");
3704
3705 assert_eq!(
3706 resolved
3707 .get("app")
3708 .map(|password| password.cleartext.as_str()),
3709 Some("app-secret")
3710 );
3711 assert_eq!(
3712 resolved
3713 .get("reporter")
3714 .map(|password| password.cleartext.as_str()),
3715 Some("reporter-secret")
3716 );
3717 }
3718
3719 #[test]
3720 fn resolve_passwords_from_cached_secrets_reports_missing_key() {
3721 let resource = password_role_policy();
3722 let cache = BTreeMap::from([(
3723 "role-passwords".to_string(),
3724 secret_with_keys("role-passwords", &[("app", "app-secret")]),
3725 )]);
3726
3727 let err = resolve_passwords_from_cached_secrets(&resource, &cache).unwrap_err();
3728 let context = match err {
3729 ReconcileError::Context(context) => context,
3730 other => panic!("expected context error, got {other:?}"),
3731 };
3732 assert!(matches!(
3733 *context,
3734 crate::context::ContextError::SecretMissing { ref name, ref key }
3735 if name == "role-passwords" && key == "reporter-password"
3736 ));
3737 }
3738
3739 #[test]
3740 fn resolve_passwords_from_cached_secrets_reports_empty_password() {
3741 let resource = password_role_policy();
3742 let cache = BTreeMap::from([(
3743 "role-passwords".to_string(),
3744 secret_with_keys(
3745 "role-passwords",
3746 &[("app", ""), ("reporter-password", "ok")],
3747 ),
3748 )]);
3749
3750 let err = resolve_passwords_from_cached_secrets(&resource, &cache).unwrap_err();
3751 assert!(matches!(
3752 err,
3753 ReconcileError::EmptyPasswordSecret { ref role, ref secret, ref key }
3754 if role == "app" && secret == "role-passwords" && key == "app"
3755 ));
3756 }
3757
3758 #[test]
3759 fn resolve_passwords_from_cached_secrets_allows_whitespace_passwords() {
3760 let resource = password_role_policy();
3761 let cache = BTreeMap::from([(
3762 "role-passwords".to_string(),
3763 secret_with_keys(
3764 "role-passwords",
3765 &[("app", " "), ("reporter-password", "\tsecret")],
3766 ),
3767 )]);
3768
3769 let resolved =
3770 resolve_passwords_from_cached_secrets(&resource, &cache).expect("should resolve");
3771
3772 assert_eq!(
3773 resolved
3774 .get("app")
3775 .map(|password| password.cleartext.as_str()),
3776 Some(" ")
3777 );
3778 assert_eq!(
3779 resolved
3780 .get("reporter")
3781 .map(|password| password.cleartext.as_str()),
3782 Some("\tsecret")
3783 );
3784 }
3785
3786 #[test]
3787 fn select_password_changes_skips_unchanged_password_sources() {
3788 let resolved = BTreeMap::from([(
3789 "app".to_string(),
3790 ResolvedPassword {
3791 cleartext: "app-secret".to_string(),
3792 source_version: "role-passwords:app:7".to_string(),
3793 },
3794 )]);
3795 let status = PostgresPolicyStatus {
3796 applied_password_source_versions: BTreeMap::from([(
3797 "app".to_string(),
3798 "role-passwords:app:7".to_string(),
3799 )]),
3800 ..Default::default()
3801 };
3802
3803 let (password_changes, current_versions) =
3804 select_password_changes(&[], &resolved, Some(&status));
3805
3806 assert!(password_changes.is_empty());
3807 assert_eq!(
3808 current_versions.get("app").map(String::as_str),
3809 Some("role-passwords:app:7")
3810 );
3811 }
3812
3813 #[test]
3814 fn select_password_changes_applies_on_source_version_change() {
3815 let resolved = BTreeMap::from([(
3816 "app".to_string(),
3817 ResolvedPassword {
3818 cleartext: "new-secret".to_string(),
3819 source_version: "role-passwords:app:8".to_string(),
3820 },
3821 )]);
3822 let status = PostgresPolicyStatus {
3823 applied_password_source_versions: BTreeMap::from([(
3824 "app".to_string(),
3825 "role-passwords:app:7".to_string(),
3826 )]),
3827 ..Default::default()
3828 };
3829
3830 let (password_changes, _) = select_password_changes(&[], &resolved, Some(&status));
3831
3832 assert_eq!(
3833 password_changes.get("app").map(String::as_str),
3834 Some("new-secret")
3835 );
3836 }
3837
3838 #[test]
3839 fn select_password_changes_applies_for_newly_created_role() {
3840 use pgroles_core::diff::Change;
3841 use pgroles_core::model::RoleState;
3842
3843 let resolved = BTreeMap::from([(
3844 "app".to_string(),
3845 ResolvedPassword {
3846 cleartext: "new-secret".to_string(),
3847 source_version: "role-passwords:app:7".to_string(),
3848 },
3849 )]);
3850 let status = PostgresPolicyStatus {
3851 applied_password_source_versions: BTreeMap::from([(
3852 "app".to_string(),
3853 "role-passwords:app:7".to_string(),
3854 )]),
3855 ..Default::default()
3856 };
3857 let changes = vec![Change::CreateRole {
3858 name: "app".to_string(),
3859 state: RoleState {
3860 login: true,
3861 ..RoleState::default()
3862 },
3863 }];
3864
3865 let (password_changes, _) = select_password_changes(&changes, &resolved, Some(&status));
3866
3867 assert_eq!(
3868 password_changes.get("app").map(String::as_str),
3869 Some("new-secret")
3870 );
3871 }
3872
3873 #[test]
3874 fn select_password_changes_applies_all_on_first_reconcile() {
3875 let resolved = BTreeMap::from([
3878 (
3879 "app".to_string(),
3880 ResolvedPassword {
3881 cleartext: "secret-a".to_string(),
3882 source_version: "role-passwords:app:1".to_string(),
3883 },
3884 ),
3885 (
3886 "reporter".to_string(),
3887 ResolvedPassword {
3888 cleartext: "secret-b".to_string(),
3889 source_version: "role-passwords:reporter:1".to_string(),
3890 },
3891 ),
3892 ]);
3893 let changes: Vec<pgroles_core::diff::Change> = vec![];
3894
3895 let (password_changes, versions) = select_password_changes(&changes, &resolved, None);
3896
3897 assert_eq!(
3898 password_changes.len(),
3899 2,
3900 "all passwords should be applied on first reconcile"
3901 );
3902 assert_eq!(
3903 password_changes.get("app").map(String::as_str),
3904 Some("secret-a")
3905 );
3906 assert_eq!(
3907 password_changes.get("reporter").map(String::as_str),
3908 Some("secret-b")
3909 );
3910 assert_eq!(versions.len(), 2, "all source versions should be tracked");
3911 }
3912
3913 #[test]
3914 fn conflict_detection_still_reports_overlapping_valid_peers() {
3915 let resource = valid_role_policy("valid-policy", "analytics", "shared-db-secret");
3916 let identity = DatabaseIdentity::from_connection("default", &resource.spec.connection);
3917 let ownership = resource.spec.ownership_claims().unwrap();
3918 let overlapping_peer =
3919 valid_role_policy("overlapping-peer", "analytics", "shared-db-secret");
3920 let invalid_peer = invalid_profile_policy("invalid-peer", "shared-db-secret");
3921
3922 let conflict = detect_policy_conflict_in_list(
3923 &resource,
3924 &identity,
3925 &ownership,
3926 vec![invalid_peer, overlapping_peer],
3927 );
3928
3929 let conflict = conflict.expect("expected overlapping peer to be reported");
3930 assert!(conflict.contains("overlapping-peer"));
3931 assert!(conflict.contains("roles: analytics"));
3932 }
3933
3934 #[test]
3935 fn parse_rfc3339_to_epoch_secs_known_timestamp() {
3936 let result = parse_rfc3339_to_epoch_secs("2024-01-01T00:00:00Z");
3938 assert_eq!(result, Some(1704067200));
3939 }
3940
3941 #[test]
3942 fn parse_rfc3339_to_epoch_secs_with_time() {
3943 let result = parse_rfc3339_to_epoch_secs("2024-01-01T12:30:45Z");
3945 assert_eq!(result, Some(1704112245));
3946 }
3947
3948 #[test]
3949 fn parse_rfc3339_to_epoch_secs_invalid_returns_none() {
3950 assert_eq!(parse_rfc3339_to_epoch_secs("not-a-date"), None);
3951 assert_eq!(parse_rfc3339_to_epoch_secs(""), None);
3952 }
3953
3954 #[test]
3955 fn parse_rfc3339_roundtrips_with_now_rfc3339() {
3956 let timestamp = crate::crd::now_rfc3339();
3957 let parsed = parse_rfc3339_to_epoch_secs(×tamp);
3958 assert!(parsed.is_some(), "should parse our own timestamps");
3959 let now_secs = std::time::SystemTime::now()
3960 .duration_since(std::time::UNIX_EPOCH)
3961 .unwrap()
3962 .as_secs();
3963 let diff = now_secs.abs_diff(parsed.unwrap());
3965 assert!(diff <= 2, "parsed time should be close to now, diff={diff}");
3966 }
3967}