1use k8s_openapi::api::core::v1::ObjectReference;
9use kube::Resource;
10use kube::runtime::events::{Event, EventType, Recorder};
11
12use crate::crd::{PolicyCondition, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyStatus};
13
14pub async fn publish_status_events(
16 recorder: &Recorder,
17 resource: &PostgresPolicy,
18 old_status: Option<&PostgresPolicyStatus>,
19 new_status: &PostgresPolicyStatus,
20) -> Result<(), kube::Error> {
21 let reference: ObjectReference = resource.object_ref(&());
22 for event in derive_status_events(old_status, new_status) {
23 recorder.publish(&event, &reference).await?;
24 }
25 Ok(())
26}
27
28pub async fn publish_plan_event(
30 recorder: &Recorder,
31 policy: &PostgresPolicy,
32 plan: &PostgresPolicyPlan,
33 event_type: PlanEventType,
34) -> Result<(), kube::Error> {
35 let reference: ObjectReference = policy.object_ref(&());
36 let plan_name = kube::ResourceExt::name_any(plan);
37 let event = match event_type {
38 PlanEventType::Created { change_count } => event(
39 EventType::Normal,
40 "PlanCreated",
41 "PlanLifecycle",
42 format!("Plan {plan_name} created with {change_count} change(s)"),
43 ),
44 PlanEventType::Approved => event(
45 EventType::Normal,
46 "PlanApproved",
47 "PlanLifecycle",
48 format!("Plan {plan_name} approved"),
49 ),
50 PlanEventType::Rejected => event(
51 EventType::Normal,
52 "PlanRejected",
53 "PlanLifecycle",
54 format!("Plan {plan_name} rejected"),
55 ),
56 PlanEventType::ApplyStarted => event(
57 EventType::Normal,
58 "ApplyStarted",
59 "PlanLifecycle",
60 format!("Executing plan {plan_name}"),
61 ),
62 PlanEventType::ApplySucceeded => event(
63 EventType::Normal,
64 "ApplySucceeded",
65 "PlanLifecycle",
66 format!("Plan {plan_name} applied successfully"),
67 ),
68 PlanEventType::ApplyFailed { error } => event(
69 EventType::Warning,
70 "ApplyFailed",
71 "PlanLifecycle",
72 format!("Plan {plan_name} failed: {error}"),
73 ),
74 };
75 recorder.publish(&event, &reference).await
76}
77
78pub enum PlanEventType {
80 Created { change_count: i32 },
82 Approved,
84 Rejected,
86 ApplyStarted,
88 ApplySucceeded,
90 ApplyFailed { error: String },
92}
93
94fn derive_status_events(
95 old_status: Option<&PostgresPolicyStatus>,
96 new_status: &PostgresPolicyStatus,
97) -> Vec<Event> {
98 let mut events = Vec::new();
99
100 if transitioned_to_true(old_status, new_status, "Conflict") {
101 let note = condition_message(new_status, "Conflict")
102 .or_else(|| new_status.last_error.clone())
103 .unwrap_or_else(|| "Policy ownership conflict detected".to_string());
104 events.push(event(
105 EventType::Warning,
106 "ConflictDetected",
107 "StatusTransition",
108 note,
109 ));
110 }
111
112 if transitioned_from_true(old_status, new_status, "Conflict") {
113 events.push(event(
114 EventType::Normal,
115 "ConflictResolved",
116 "StatusTransition",
117 "Policy ownership conflict resolved".to_string(),
118 ));
119 }
120
121 if transitioned_to_true(old_status, new_status, "Paused") {
122 let note = condition_message(new_status, "Paused")
123 .unwrap_or_else(|| "Reconciliation suspended by spec".to_string());
124 events.push(event(
125 EventType::Normal,
126 "Suspended",
127 "StatusTransition",
128 note,
129 ));
130 }
131
132 if transitioned_to_true(old_status, new_status, "Drifted") {
133 let note = condition_message(new_status, "Drifted")
134 .unwrap_or_else(|| "Planned changes are pending review".to_string());
135 events.push(event(
136 EventType::Normal,
137 "DriftDetected",
138 "StatusTransition",
139 note,
140 ));
141 }
142
143 if plan_became_clean(old_status, new_status) {
144 let note = condition_message(new_status, "Drifted")
145 .unwrap_or_else(|| "Plan computed; database already matches desired state".to_string());
146 events.push(event(
147 EventType::Normal,
148 "PlanClean",
149 "StatusTransition",
150 note,
151 ));
152 }
153
154 if ready_became_true(old_status, new_status) && !is_planned_ready(new_status) {
155 let reason = if had_ready_condition(old_status) {
156 "Recovered"
157 } else {
158 "Reconciled"
159 };
160 let note = condition_message(new_status, "Ready")
161 .unwrap_or_else(|| "Policy reconciled successfully".to_string());
162 events.push(event(EventType::Normal, reason, "StatusTransition", note));
163 }
164
165 if let Some(reason) = noteworthy_failure_reason(old_status, new_status) {
166 let note = condition_message(new_status, "Ready")
167 .or_else(|| new_status.last_error.clone())
168 .unwrap_or_else(|| format!("Policy entered {reason} state"));
169 events.push(event(EventType::Warning, reason, "StatusTransition", note));
170 }
171
172 events
173}
174
175fn event(type_: EventType, reason: &str, action: &str, note: String) -> Event {
176 Event {
177 type_,
178 reason: reason.to_string(),
179 note: Some(note),
180 action: action.to_string(),
181 secondary: None,
182 }
183}
184
185fn condition<'a>(
186 status: &'a PostgresPolicyStatus,
187 condition_type: &str,
188) -> Option<&'a PolicyCondition> {
189 status
190 .conditions
191 .iter()
192 .find(|condition| condition.condition_type == condition_type)
193}
194
195fn condition_status<'a>(
196 status: Option<&'a PostgresPolicyStatus>,
197 condition_type: &str,
198) -> Option<&'a str> {
199 status
200 .and_then(|status| condition(status, condition_type))
201 .map(|condition| condition.status.as_str())
202}
203
204fn condition_reason<'a>(
205 status: Option<&'a PostgresPolicyStatus>,
206 condition_type: &str,
207) -> Option<&'a str> {
208 status
209 .and_then(|status| condition(status, condition_type))
210 .and_then(|condition| condition.reason.as_deref())
211}
212
213fn condition_message(status: &PostgresPolicyStatus, condition_type: &str) -> Option<String> {
214 condition(status, condition_type).and_then(|condition| condition.message.clone())
215}
216
217fn condition_is_true(status: Option<&PostgresPolicyStatus>, condition_type: &str) -> bool {
218 condition_status(status, condition_type) == Some("True")
219}
220
221fn transitioned_to_true(
222 old_status: Option<&PostgresPolicyStatus>,
223 new_status: &PostgresPolicyStatus,
224 condition_type: &str,
225) -> bool {
226 !condition_is_true(old_status, condition_type)
227 && condition_is_true(Some(new_status), condition_type)
228}
229
230fn transitioned_from_true(
231 old_status: Option<&PostgresPolicyStatus>,
232 new_status: &PostgresPolicyStatus,
233 condition_type: &str,
234) -> bool {
235 condition_is_true(old_status, condition_type)
236 && !condition_is_true(Some(new_status), condition_type)
237}
238
239fn was_ready(old_status: Option<&PostgresPolicyStatus>) -> bool {
240 condition_is_true(old_status, "Ready")
241}
242
243fn had_ready_condition(old_status: Option<&PostgresPolicyStatus>) -> bool {
244 old_status
245 .and_then(|status| condition(status, "Ready"))
246 .is_some()
247}
248
249fn ready_became_true(
250 old_status: Option<&PostgresPolicyStatus>,
251 new_status: &PostgresPolicyStatus,
252) -> bool {
253 !was_ready(old_status) && condition_is_true(Some(new_status), "Ready")
254}
255
256fn is_planned_ready(status: &PostgresPolicyStatus) -> bool {
257 condition(status, "Ready").and_then(|ready| ready.reason.as_deref()) == Some("Planned")
258}
259
260fn plan_became_clean(
261 old_status: Option<&PostgresPolicyStatus>,
262 new_status: &PostgresPolicyStatus,
263) -> bool {
264 if !is_planned_ready(new_status) || condition_is_true(Some(new_status), "Drifted") {
265 return false;
266 }
267
268 !is_planned_ready_status(old_status) || condition_is_true(old_status, "Drifted")
269}
270
271fn is_planned_ready_status(status: Option<&PostgresPolicyStatus>) -> bool {
272 status.map(is_planned_ready).unwrap_or(false)
273}
274
275fn noteworthy_failure_reason(
276 old_status: Option<&PostgresPolicyStatus>,
277 new_status: &PostgresPolicyStatus,
278) -> Option<&'static str> {
279 let ready = condition(new_status, "Ready")?;
280 if ready.status != "False" {
281 return None;
282 }
283
284 let reason = ready.reason.as_deref()?;
285 if matches!(reason, "ConflictingPolicy" | "Suspended") {
286 return None;
287 }
288
289 let mapped_reason = match reason {
290 "InvalidSpec" => "InvalidSpec",
291 "SecretMissing" | "SecretFetchFailed" => "SecretFetchFailed",
292 "DatabaseConnectionFailed" => "DatabaseConnectionFailed",
293 "GcpAuthFailed" => "GcpAuthFailed",
294 "InsufficientPrivileges" => "InsufficientPrivileges",
295 "UnsafeRoleDrops" => "UnsafeRoleDropsBlocked",
296 _ => return None,
297 };
298
299 let old_ready_status = condition_status(old_status, "Ready");
300 let old_ready_reason = condition_reason(old_status, "Ready");
301
302 if old_ready_status == Some("False") && old_ready_reason == Some(reason) {
303 None
304 } else {
305 Some(mapped_reason)
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::crd::{
313 PostgresPolicyStatus, conflict_condition, drifted_condition, paused_condition,
314 ready_condition,
315 };
316
317 fn reasons(events: &[Event]) -> Vec<&str> {
318 events.iter().map(|event| event.reason.as_str()).collect()
319 }
320
321 #[test]
322 fn emits_conflict_detected_when_conflict_condition_becomes_true() {
323 let mut status = PostgresPolicyStatus::default();
324 status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
325 status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
326 status.last_error = Some("overlap".to_string());
327
328 let events = derive_status_events(None, &status);
329 assert_eq!(reasons(&events), vec!["ConflictDetected"]);
330 }
331
332 #[test]
333 fn emits_conflict_resolved_and_recovered_when_policy_recovers_from_conflict() {
334 let mut old_status = PostgresPolicyStatus::default();
335 old_status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
336 old_status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
337
338 let mut new_status = PostgresPolicyStatus::default();
339 new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
340
341 let events = derive_status_events(Some(&old_status), &new_status);
342 assert_eq!(reasons(&events), vec!["ConflictResolved", "Recovered"]);
343 }
344
345 #[test]
346 fn emits_suspended_when_policy_is_paused() {
347 let mut status = PostgresPolicyStatus::default();
348 status.set_condition(paused_condition("Reconciliation suspended by spec"));
349 status.set_condition(ready_condition(
350 false,
351 "Suspended",
352 "Reconciliation suspended by spec",
353 ));
354
355 let events = derive_status_events(None, &status);
356 assert_eq!(reasons(&events), vec!["Suspended"]);
357 }
358
359 #[test]
360 fn emits_reconciled_on_first_success() {
361 let mut status = PostgresPolicyStatus::default();
362 status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
363
364 let events = derive_status_events(None, &status);
365 assert_eq!(reasons(&events), vec!["Reconciled"]);
366 }
367
368 #[test]
369 fn emits_recovered_when_transitioning_from_not_ready_to_ready() {
370 let mut old_status = PostgresPolicyStatus::default();
371 old_status.set_condition(ready_condition(
372 false,
373 "DatabaseConnectionFailed",
374 "database unavailable",
375 ));
376
377 let mut new_status = PostgresPolicyStatus::default();
378 new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
379
380 let events = derive_status_events(Some(&old_status), &new_status);
381 assert_eq!(reasons(&events), vec!["Recovered"]);
382 }
383
384 #[test]
385 fn emits_secret_fetch_failed_when_missing_secret_first_detected() {
386 let mut status = PostgresPolicyStatus::default();
387 status.set_condition(ready_condition(
388 false,
389 "SecretMissing",
390 "Secret \"db\" does not contain key \"DATABASE_URL\"",
391 ));
392 status.last_error = Some("Secret \"db\" does not contain key \"DATABASE_URL\"".to_string());
393
394 let events = derive_status_events(None, &status);
395 assert_eq!(reasons(&events), vec!["SecretFetchFailed"]);
396 }
397
398 #[test]
399 fn does_not_repeat_same_failure_event_without_transition() {
400 let mut old_status = PostgresPolicyStatus::default();
401 old_status.set_condition(ready_condition(
402 false,
403 "DatabaseConnectionFailed",
404 "connection refused",
405 ));
406
407 let mut new_status = PostgresPolicyStatus::default();
408 new_status.set_condition(ready_condition(
409 false,
410 "DatabaseConnectionFailed",
411 "connection refused",
412 ));
413
414 let events = derive_status_events(Some(&old_status), &new_status);
415 assert!(events.is_empty());
416 }
417
418 #[test]
419 fn emits_insufficient_privileges_on_failure_transition() {
420 let mut old_status = PostgresPolicyStatus::default();
421 old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
422
423 let mut new_status = PostgresPolicyStatus::default();
424 new_status.set_condition(ready_condition(
425 false,
426 "InsufficientPrivileges",
427 "permission denied to create role",
428 ));
429
430 let events = derive_status_events(Some(&old_status), &new_status);
431 assert_eq!(reasons(&events), vec!["InsufficientPrivileges"]);
432 }
433
434 #[test]
435 fn emits_gcp_auth_failed_on_failure_transition() {
436 let mut old_status = PostgresPolicyStatus::default();
437 old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
438
439 let mut new_status = PostgresPolicyStatus::default();
440 new_status.set_condition(ready_condition(
441 false,
442 "GcpAuthFailed",
443 "token request rejected",
444 ));
445
446 let events = derive_status_events(Some(&old_status), &new_status);
447 assert_eq!(reasons(&events), vec!["GcpAuthFailed"]);
448 assert!(matches!(events[0].type_, EventType::Warning));
449 assert_eq!(events[0].note.as_deref(), Some("token request rejected"));
450 }
451
452 #[test]
453 fn emits_drift_detected_for_plan_mode_with_pending_changes() {
454 let mut status = PostgresPolicyStatus::default();
455 status.set_condition(ready_condition(true, "Planned", "Plan computed"));
456 status.set_condition(drifted_condition(
457 true,
458 "DriftDetected",
459 "2 planned change(s) pending review",
460 ));
461
462 let events = derive_status_events(None, &status);
463 assert_eq!(reasons(&events), vec!["DriftDetected"]);
464 }
465
466 #[test]
467 fn emits_plan_clean_when_plan_mode_has_no_pending_changes() {
468 let mut status = PostgresPolicyStatus::default();
469 status.set_condition(ready_condition(true, "Planned", "Plan computed"));
470 status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
471
472 let events = derive_status_events(None, &status);
473 assert_eq!(reasons(&events), vec!["PlanClean"]);
474 }
475}