1use k8s_openapi::api::core::v1::ObjectReference;
9use kube::Resource;
10use kube::runtime::events::{Event, EventType, Recorder};
11
12use crate::crd::{PolicyCondition, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyStatus};
13
14pub async fn publish_status_events(
16 recorder: &Recorder,
17 resource: &PostgresPolicy,
18 old_status: Option<&PostgresPolicyStatus>,
19 new_status: &PostgresPolicyStatus,
20) -> Result<(), kube::Error> {
21 let reference: ObjectReference = resource.object_ref(&());
22 for event in derive_status_events(old_status, new_status) {
23 recorder.publish(&event, &reference).await?;
24 }
25 Ok(())
26}
27
28pub async fn publish_plan_event(
30 recorder: &Recorder,
31 policy: &PostgresPolicy,
32 plan: &PostgresPolicyPlan,
33 event_type: PlanEventType,
34) -> Result<(), kube::Error> {
35 let reference: ObjectReference = policy.object_ref(&());
36 let plan_name = kube::ResourceExt::name_any(plan);
37 let event = match event_type {
38 PlanEventType::Created { change_count } => event(
39 EventType::Normal,
40 "PlanCreated",
41 "PlanLifecycle",
42 format!("Plan {plan_name} created with {change_count} change(s)"),
43 ),
44 PlanEventType::Approved => event(
45 EventType::Normal,
46 "PlanApproved",
47 "PlanLifecycle",
48 format!("Plan {plan_name} approved"),
49 ),
50 PlanEventType::Rejected => event(
51 EventType::Normal,
52 "PlanRejected",
53 "PlanLifecycle",
54 format!("Plan {plan_name} rejected"),
55 ),
56 PlanEventType::ApplyStarted => event(
57 EventType::Normal,
58 "ApplyStarted",
59 "PlanLifecycle",
60 format!("Executing plan {plan_name}"),
61 ),
62 PlanEventType::ApplySucceeded => event(
63 EventType::Normal,
64 "ApplySucceeded",
65 "PlanLifecycle",
66 format!("Plan {plan_name} applied successfully"),
67 ),
68 PlanEventType::ApplyFailed { error } => event(
69 EventType::Warning,
70 "ApplyFailed",
71 "PlanLifecycle",
72 format!("Plan {plan_name} failed: {error}"),
73 ),
74 };
75 recorder.publish(&event, &reference).await
76}
77
78pub enum PlanEventType {
80 Created { change_count: i32 },
82 Approved,
84 Rejected,
86 ApplyStarted,
88 ApplySucceeded,
90 ApplyFailed { error: String },
92}
93
94fn derive_status_events(
95 old_status: Option<&PostgresPolicyStatus>,
96 new_status: &PostgresPolicyStatus,
97) -> Vec<Event> {
98 let mut events = Vec::new();
99
100 if transitioned_to_true(old_status, new_status, "Conflict") {
101 let note = condition_message(new_status, "Conflict")
102 .or_else(|| new_status.last_error.clone())
103 .unwrap_or_else(|| "Policy ownership conflict detected".to_string());
104 events.push(event(
105 EventType::Warning,
106 "ConflictDetected",
107 "StatusTransition",
108 note,
109 ));
110 }
111
112 if transitioned_from_true(old_status, new_status, "Conflict") {
113 events.push(event(
114 EventType::Normal,
115 "ConflictResolved",
116 "StatusTransition",
117 "Policy ownership conflict resolved".to_string(),
118 ));
119 }
120
121 if transitioned_to_true(old_status, new_status, "Paused") {
122 let note = condition_message(new_status, "Paused")
123 .unwrap_or_else(|| "Reconciliation suspended by spec".to_string());
124 events.push(event(
125 EventType::Normal,
126 "Suspended",
127 "StatusTransition",
128 note,
129 ));
130 }
131
132 if transitioned_to_true(old_status, new_status, "Drifted") {
133 let note = condition_message(new_status, "Drifted")
134 .unwrap_or_else(|| "Planned changes are pending review".to_string());
135 events.push(event(
136 EventType::Normal,
137 "DriftDetected",
138 "StatusTransition",
139 note,
140 ));
141 }
142
143 if plan_became_clean(old_status, new_status) {
144 let note = condition_message(new_status, "Drifted")
145 .unwrap_or_else(|| "Plan computed; database already matches desired state".to_string());
146 events.push(event(
147 EventType::Normal,
148 "PlanClean",
149 "StatusTransition",
150 note,
151 ));
152 }
153
154 if ready_became_true(old_status, new_status) && !is_planned_ready(new_status) {
155 let reason = if had_ready_condition(old_status) {
156 "Recovered"
157 } else {
158 "Reconciled"
159 };
160 let note = condition_message(new_status, "Ready")
161 .unwrap_or_else(|| "Policy reconciled successfully".to_string());
162 events.push(event(EventType::Normal, reason, "StatusTransition", note));
163 }
164
165 if let Some(reason) = noteworthy_failure_reason(old_status, new_status) {
166 let note = condition_message(new_status, "Ready")
167 .or_else(|| new_status.last_error.clone())
168 .unwrap_or_else(|| format!("Policy entered {reason} state"));
169 events.push(event(EventType::Warning, reason, "StatusTransition", note));
170 }
171
172 events
173}
174
175fn event(type_: EventType, reason: &str, action: &str, note: String) -> Event {
176 Event {
177 type_,
178 reason: reason.to_string(),
179 note: Some(note),
180 action: action.to_string(),
181 secondary: None,
182 }
183}
184
185fn condition<'a>(
186 status: &'a PostgresPolicyStatus,
187 condition_type: &str,
188) -> Option<&'a PolicyCondition> {
189 status
190 .conditions
191 .iter()
192 .find(|condition| condition.condition_type == condition_type)
193}
194
195fn condition_status<'a>(
196 status: Option<&'a PostgresPolicyStatus>,
197 condition_type: &str,
198) -> Option<&'a str> {
199 status
200 .and_then(|status| condition(status, condition_type))
201 .map(|condition| condition.status.as_str())
202}
203
204fn condition_reason<'a>(
205 status: Option<&'a PostgresPolicyStatus>,
206 condition_type: &str,
207) -> Option<&'a str> {
208 status
209 .and_then(|status| condition(status, condition_type))
210 .and_then(|condition| condition.reason.as_deref())
211}
212
213fn condition_message(status: &PostgresPolicyStatus, condition_type: &str) -> Option<String> {
214 condition(status, condition_type).and_then(|condition| condition.message.clone())
215}
216
217fn condition_is_true(status: Option<&PostgresPolicyStatus>, condition_type: &str) -> bool {
218 condition_status(status, condition_type) == Some("True")
219}
220
221fn transitioned_to_true(
222 old_status: Option<&PostgresPolicyStatus>,
223 new_status: &PostgresPolicyStatus,
224 condition_type: &str,
225) -> bool {
226 !condition_is_true(old_status, condition_type)
227 && condition_is_true(Some(new_status), condition_type)
228}
229
230fn transitioned_from_true(
231 old_status: Option<&PostgresPolicyStatus>,
232 new_status: &PostgresPolicyStatus,
233 condition_type: &str,
234) -> bool {
235 condition_is_true(old_status, condition_type)
236 && !condition_is_true(Some(new_status), condition_type)
237}
238
239fn was_ready(old_status: Option<&PostgresPolicyStatus>) -> bool {
240 condition_is_true(old_status, "Ready")
241}
242
243fn had_ready_condition(old_status: Option<&PostgresPolicyStatus>) -> bool {
244 old_status
245 .and_then(|status| condition(status, "Ready"))
246 .is_some()
247}
248
249fn ready_became_true(
250 old_status: Option<&PostgresPolicyStatus>,
251 new_status: &PostgresPolicyStatus,
252) -> bool {
253 !was_ready(old_status) && condition_is_true(Some(new_status), "Ready")
254}
255
256fn is_planned_ready(status: &PostgresPolicyStatus) -> bool {
257 condition(status, "Ready").and_then(|ready| ready.reason.as_deref()) == Some("Planned")
258}
259
260fn plan_became_clean(
261 old_status: Option<&PostgresPolicyStatus>,
262 new_status: &PostgresPolicyStatus,
263) -> bool {
264 if !is_planned_ready(new_status) || condition_is_true(Some(new_status), "Drifted") {
265 return false;
266 }
267
268 !is_planned_ready_status(old_status) || condition_is_true(old_status, "Drifted")
269}
270
271fn is_planned_ready_status(status: Option<&PostgresPolicyStatus>) -> bool {
272 status.map(is_planned_ready).unwrap_or(false)
273}
274
275fn noteworthy_failure_reason(
276 old_status: Option<&PostgresPolicyStatus>,
277 new_status: &PostgresPolicyStatus,
278) -> Option<&'static str> {
279 let ready = condition(new_status, "Ready")?;
280 if ready.status != "False" {
281 return None;
282 }
283
284 let reason = ready.reason.as_deref()?;
285 if matches!(reason, "ConflictingPolicy" | "Suspended") {
286 return None;
287 }
288
289 let mapped_reason = match reason {
290 "InvalidSpec" => "InvalidSpec",
291 "SecretMissing" | "SecretFetchFailed" => "SecretFetchFailed",
292 "DatabaseConnectionFailed" => "DatabaseConnectionFailed",
293 "InsufficientPrivileges" => "InsufficientPrivileges",
294 "UnsafeRoleDrops" => "UnsafeRoleDropsBlocked",
295 _ => return None,
296 };
297
298 let old_ready_status = condition_status(old_status, "Ready");
299 let old_ready_reason = condition_reason(old_status, "Ready");
300
301 if old_ready_status == Some("False") && old_ready_reason == Some(reason) {
302 None
303 } else {
304 Some(mapped_reason)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use crate::crd::{
312 PostgresPolicyStatus, conflict_condition, drifted_condition, paused_condition,
313 ready_condition,
314 };
315
316 fn reasons(events: &[Event]) -> Vec<&str> {
317 events.iter().map(|event| event.reason.as_str()).collect()
318 }
319
320 #[test]
321 fn emits_conflict_detected_when_conflict_condition_becomes_true() {
322 let mut status = PostgresPolicyStatus::default();
323 status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
324 status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
325 status.last_error = Some("overlap".to_string());
326
327 let events = derive_status_events(None, &status);
328 assert_eq!(reasons(&events), vec!["ConflictDetected"]);
329 }
330
331 #[test]
332 fn emits_conflict_resolved_and_recovered_when_policy_recovers_from_conflict() {
333 let mut old_status = PostgresPolicyStatus::default();
334 old_status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
335 old_status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
336
337 let mut new_status = PostgresPolicyStatus::default();
338 new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
339
340 let events = derive_status_events(Some(&old_status), &new_status);
341 assert_eq!(reasons(&events), vec!["ConflictResolved", "Recovered"]);
342 }
343
344 #[test]
345 fn emits_suspended_when_policy_is_paused() {
346 let mut status = PostgresPolicyStatus::default();
347 status.set_condition(paused_condition("Reconciliation suspended by spec"));
348 status.set_condition(ready_condition(
349 false,
350 "Suspended",
351 "Reconciliation suspended by spec",
352 ));
353
354 let events = derive_status_events(None, &status);
355 assert_eq!(reasons(&events), vec!["Suspended"]);
356 }
357
358 #[test]
359 fn emits_reconciled_on_first_success() {
360 let mut status = PostgresPolicyStatus::default();
361 status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
362
363 let events = derive_status_events(None, &status);
364 assert_eq!(reasons(&events), vec!["Reconciled"]);
365 }
366
367 #[test]
368 fn emits_recovered_when_transitioning_from_not_ready_to_ready() {
369 let mut old_status = PostgresPolicyStatus::default();
370 old_status.set_condition(ready_condition(
371 false,
372 "DatabaseConnectionFailed",
373 "database unavailable",
374 ));
375
376 let mut new_status = PostgresPolicyStatus::default();
377 new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
378
379 let events = derive_status_events(Some(&old_status), &new_status);
380 assert_eq!(reasons(&events), vec!["Recovered"]);
381 }
382
383 #[test]
384 fn emits_secret_fetch_failed_when_missing_secret_first_detected() {
385 let mut status = PostgresPolicyStatus::default();
386 status.set_condition(ready_condition(
387 false,
388 "SecretMissing",
389 "Secret \"db\" does not contain key \"DATABASE_URL\"",
390 ));
391 status.last_error = Some("Secret \"db\" does not contain key \"DATABASE_URL\"".to_string());
392
393 let events = derive_status_events(None, &status);
394 assert_eq!(reasons(&events), vec!["SecretFetchFailed"]);
395 }
396
397 #[test]
398 fn does_not_repeat_same_failure_event_without_transition() {
399 let mut old_status = PostgresPolicyStatus::default();
400 old_status.set_condition(ready_condition(
401 false,
402 "DatabaseConnectionFailed",
403 "connection refused",
404 ));
405
406 let mut new_status = PostgresPolicyStatus::default();
407 new_status.set_condition(ready_condition(
408 false,
409 "DatabaseConnectionFailed",
410 "connection refused",
411 ));
412
413 let events = derive_status_events(Some(&old_status), &new_status);
414 assert!(events.is_empty());
415 }
416
417 #[test]
418 fn emits_insufficient_privileges_on_failure_transition() {
419 let mut old_status = PostgresPolicyStatus::default();
420 old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
421
422 let mut new_status = PostgresPolicyStatus::default();
423 new_status.set_condition(ready_condition(
424 false,
425 "InsufficientPrivileges",
426 "permission denied to create role",
427 ));
428
429 let events = derive_status_events(Some(&old_status), &new_status);
430 assert_eq!(reasons(&events), vec!["InsufficientPrivileges"]);
431 }
432
433 #[test]
434 fn emits_drift_detected_for_plan_mode_with_pending_changes() {
435 let mut status = PostgresPolicyStatus::default();
436 status.set_condition(ready_condition(true, "Planned", "Plan computed"));
437 status.set_condition(drifted_condition(
438 true,
439 "DriftDetected",
440 "2 planned change(s) pending review",
441 ));
442
443 let events = derive_status_events(None, &status);
444 assert_eq!(reasons(&events), vec!["DriftDetected"]);
445 }
446
447 #[test]
448 fn emits_plan_clean_when_plan_mode_has_no_pending_changes() {
449 let mut status = PostgresPolicyStatus::default();
450 status.set_condition(ready_condition(true, "Planned", "Plan computed"));
451 status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
452
453 let events = derive_status_events(None, &status);
454 assert_eq!(reasons(&events), vec!["PlanClean"]);
455 }
456}