Skip to main content

pgroles_operator/
events.rs

1//! Transition-based Kubernetes Events for `PostgresPolicy` resources.
2//!
3//! Events complement status conditions:
4//! - status remains the source of truth for the current state
5//! - Events surface notable transitions in `kubectl describe`
6//! - OTLP metrics remain the fleet-level observability path
7
8use k8s_openapi::api::core::v1::ObjectReference;
9use kube::Resource;
10use kube::runtime::events::{Event, EventType, Recorder};
11
12use crate::crd::{PolicyCondition, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyStatus};
13
14/// Publish Kubernetes Events for notable status transitions.
15pub async fn publish_status_events(
16    recorder: &Recorder,
17    resource: &PostgresPolicy,
18    old_status: Option<&PostgresPolicyStatus>,
19    new_status: &PostgresPolicyStatus,
20) -> Result<(), kube::Error> {
21    let reference: ObjectReference = resource.object_ref(&());
22    for event in derive_status_events(old_status, new_status) {
23        recorder.publish(&event, &reference).await?;
24    }
25    Ok(())
26}
27
28/// Publish a plan lifecycle event on the parent policy.
29pub async fn publish_plan_event(
30    recorder: &Recorder,
31    policy: &PostgresPolicy,
32    plan: &PostgresPolicyPlan,
33    event_type: PlanEventType,
34) -> Result<(), kube::Error> {
35    let reference: ObjectReference = policy.object_ref(&());
36    let plan_name = kube::ResourceExt::name_any(plan);
37    let event = match event_type {
38        PlanEventType::Created { change_count } => event(
39            EventType::Normal,
40            "PlanCreated",
41            "PlanLifecycle",
42            format!("Plan {plan_name} created with {change_count} change(s)"),
43        ),
44        PlanEventType::Approved => event(
45            EventType::Normal,
46            "PlanApproved",
47            "PlanLifecycle",
48            format!("Plan {plan_name} approved"),
49        ),
50        PlanEventType::Rejected => event(
51            EventType::Normal,
52            "PlanRejected",
53            "PlanLifecycle",
54            format!("Plan {plan_name} rejected"),
55        ),
56        PlanEventType::ApplyStarted => event(
57            EventType::Normal,
58            "ApplyStarted",
59            "PlanLifecycle",
60            format!("Executing plan {plan_name}"),
61        ),
62        PlanEventType::ApplySucceeded => event(
63            EventType::Normal,
64            "ApplySucceeded",
65            "PlanLifecycle",
66            format!("Plan {plan_name} applied successfully"),
67        ),
68        PlanEventType::ApplyFailed { error } => event(
69            EventType::Warning,
70            "ApplyFailed",
71            "PlanLifecycle",
72            format!("Plan {plan_name} failed: {error}"),
73        ),
74    };
75    recorder.publish(&event, &reference).await
76}
77
78/// Types of plan lifecycle events.
79pub enum PlanEventType {
80    /// A new plan was computed.
81    Created { change_count: i32 },
82    /// Approval annotation detected on a plan.
83    Approved,
84    /// Rejection annotation detected on a plan.
85    Rejected,
86    /// Plan execution has started.
87    ApplyStarted,
88    /// Plan execution completed successfully.
89    ApplySucceeded,
90    /// Plan execution failed.
91    ApplyFailed { error: String },
92}
93
94fn derive_status_events(
95    old_status: Option<&PostgresPolicyStatus>,
96    new_status: &PostgresPolicyStatus,
97) -> Vec<Event> {
98    let mut events = Vec::new();
99
100    if transitioned_to_true(old_status, new_status, "Conflict") {
101        let note = condition_message(new_status, "Conflict")
102            .or_else(|| new_status.last_error.clone())
103            .unwrap_or_else(|| "Policy ownership conflict detected".to_string());
104        events.push(event(
105            EventType::Warning,
106            "ConflictDetected",
107            "StatusTransition",
108            note,
109        ));
110    }
111
112    if transitioned_from_true(old_status, new_status, "Conflict") {
113        events.push(event(
114            EventType::Normal,
115            "ConflictResolved",
116            "StatusTransition",
117            "Policy ownership conflict resolved".to_string(),
118        ));
119    }
120
121    if transitioned_to_true(old_status, new_status, "Paused") {
122        let note = condition_message(new_status, "Paused")
123            .unwrap_or_else(|| "Reconciliation suspended by spec".to_string());
124        events.push(event(
125            EventType::Normal,
126            "Suspended",
127            "StatusTransition",
128            note,
129        ));
130    }
131
132    if transitioned_to_true(old_status, new_status, "Drifted") {
133        let note = condition_message(new_status, "Drifted")
134            .unwrap_or_else(|| "Planned changes are pending review".to_string());
135        events.push(event(
136            EventType::Normal,
137            "DriftDetected",
138            "StatusTransition",
139            note,
140        ));
141    }
142
143    if plan_became_clean(old_status, new_status) {
144        let note = condition_message(new_status, "Drifted")
145            .unwrap_or_else(|| "Plan computed; database already matches desired state".to_string());
146        events.push(event(
147            EventType::Normal,
148            "PlanClean",
149            "StatusTransition",
150            note,
151        ));
152    }
153
154    if ready_became_true(old_status, new_status) && !is_planned_ready(new_status) {
155        let reason = if had_ready_condition(old_status) {
156            "Recovered"
157        } else {
158            "Reconciled"
159        };
160        let note = condition_message(new_status, "Ready")
161            .unwrap_or_else(|| "Policy reconciled successfully".to_string());
162        events.push(event(EventType::Normal, reason, "StatusTransition", note));
163    }
164
165    if let Some(reason) = noteworthy_failure_reason(old_status, new_status) {
166        let note = condition_message(new_status, "Ready")
167            .or_else(|| new_status.last_error.clone())
168            .unwrap_or_else(|| format!("Policy entered {reason} state"));
169        events.push(event(EventType::Warning, reason, "StatusTransition", note));
170    }
171
172    events
173}
174
175fn event(type_: EventType, reason: &str, action: &str, note: String) -> Event {
176    Event {
177        type_,
178        reason: reason.to_string(),
179        note: Some(note),
180        action: action.to_string(),
181        secondary: None,
182    }
183}
184
185fn condition<'a>(
186    status: &'a PostgresPolicyStatus,
187    condition_type: &str,
188) -> Option<&'a PolicyCondition> {
189    status
190        .conditions
191        .iter()
192        .find(|condition| condition.condition_type == condition_type)
193}
194
195fn condition_status<'a>(
196    status: Option<&'a PostgresPolicyStatus>,
197    condition_type: &str,
198) -> Option<&'a str> {
199    status
200        .and_then(|status| condition(status, condition_type))
201        .map(|condition| condition.status.as_str())
202}
203
204fn condition_reason<'a>(
205    status: Option<&'a PostgresPolicyStatus>,
206    condition_type: &str,
207) -> Option<&'a str> {
208    status
209        .and_then(|status| condition(status, condition_type))
210        .and_then(|condition| condition.reason.as_deref())
211}
212
213fn condition_message(status: &PostgresPolicyStatus, condition_type: &str) -> Option<String> {
214    condition(status, condition_type).and_then(|condition| condition.message.clone())
215}
216
217fn condition_is_true(status: Option<&PostgresPolicyStatus>, condition_type: &str) -> bool {
218    condition_status(status, condition_type) == Some("True")
219}
220
221fn transitioned_to_true(
222    old_status: Option<&PostgresPolicyStatus>,
223    new_status: &PostgresPolicyStatus,
224    condition_type: &str,
225) -> bool {
226    !condition_is_true(old_status, condition_type)
227        && condition_is_true(Some(new_status), condition_type)
228}
229
230fn transitioned_from_true(
231    old_status: Option<&PostgresPolicyStatus>,
232    new_status: &PostgresPolicyStatus,
233    condition_type: &str,
234) -> bool {
235    condition_is_true(old_status, condition_type)
236        && !condition_is_true(Some(new_status), condition_type)
237}
238
239fn was_ready(old_status: Option<&PostgresPolicyStatus>) -> bool {
240    condition_is_true(old_status, "Ready")
241}
242
243fn had_ready_condition(old_status: Option<&PostgresPolicyStatus>) -> bool {
244    old_status
245        .and_then(|status| condition(status, "Ready"))
246        .is_some()
247}
248
249fn ready_became_true(
250    old_status: Option<&PostgresPolicyStatus>,
251    new_status: &PostgresPolicyStatus,
252) -> bool {
253    !was_ready(old_status) && condition_is_true(Some(new_status), "Ready")
254}
255
256fn is_planned_ready(status: &PostgresPolicyStatus) -> bool {
257    condition(status, "Ready").and_then(|ready| ready.reason.as_deref()) == Some("Planned")
258}
259
260fn plan_became_clean(
261    old_status: Option<&PostgresPolicyStatus>,
262    new_status: &PostgresPolicyStatus,
263) -> bool {
264    if !is_planned_ready(new_status) || condition_is_true(Some(new_status), "Drifted") {
265        return false;
266    }
267
268    !is_planned_ready_status(old_status) || condition_is_true(old_status, "Drifted")
269}
270
271fn is_planned_ready_status(status: Option<&PostgresPolicyStatus>) -> bool {
272    status.map(is_planned_ready).unwrap_or(false)
273}
274
275fn noteworthy_failure_reason(
276    old_status: Option<&PostgresPolicyStatus>,
277    new_status: &PostgresPolicyStatus,
278) -> Option<&'static str> {
279    let ready = condition(new_status, "Ready")?;
280    if ready.status != "False" {
281        return None;
282    }
283
284    let reason = ready.reason.as_deref()?;
285    if matches!(reason, "ConflictingPolicy" | "Suspended") {
286        return None;
287    }
288
289    let mapped_reason = match reason {
290        "InvalidSpec" => "InvalidSpec",
291        "SecretMissing" | "SecretFetchFailed" => "SecretFetchFailed",
292        "DatabaseConnectionFailed" => "DatabaseConnectionFailed",
293        "InsufficientPrivileges" => "InsufficientPrivileges",
294        "UnsafeRoleDrops" => "UnsafeRoleDropsBlocked",
295        _ => return None,
296    };
297
298    let old_ready_status = condition_status(old_status, "Ready");
299    let old_ready_reason = condition_reason(old_status, "Ready");
300
301    if old_ready_status == Some("False") && old_ready_reason == Some(reason) {
302        None
303    } else {
304        Some(mapped_reason)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use crate::crd::{
312        PostgresPolicyStatus, conflict_condition, drifted_condition, paused_condition,
313        ready_condition,
314    };
315
316    fn reasons(events: &[Event]) -> Vec<&str> {
317        events.iter().map(|event| event.reason.as_str()).collect()
318    }
319
320    #[test]
321    fn emits_conflict_detected_when_conflict_condition_becomes_true() {
322        let mut status = PostgresPolicyStatus::default();
323        status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
324        status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
325        status.last_error = Some("overlap".to_string());
326
327        let events = derive_status_events(None, &status);
328        assert_eq!(reasons(&events), vec!["ConflictDetected"]);
329    }
330
331    #[test]
332    fn emits_conflict_resolved_and_recovered_when_policy_recovers_from_conflict() {
333        let mut old_status = PostgresPolicyStatus::default();
334        old_status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
335        old_status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
336
337        let mut new_status = PostgresPolicyStatus::default();
338        new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
339
340        let events = derive_status_events(Some(&old_status), &new_status);
341        assert_eq!(reasons(&events), vec!["ConflictResolved", "Recovered"]);
342    }
343
344    #[test]
345    fn emits_suspended_when_policy_is_paused() {
346        let mut status = PostgresPolicyStatus::default();
347        status.set_condition(paused_condition("Reconciliation suspended by spec"));
348        status.set_condition(ready_condition(
349            false,
350            "Suspended",
351            "Reconciliation suspended by spec",
352        ));
353
354        let events = derive_status_events(None, &status);
355        assert_eq!(reasons(&events), vec!["Suspended"]);
356    }
357
358    #[test]
359    fn emits_reconciled_on_first_success() {
360        let mut status = PostgresPolicyStatus::default();
361        status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
362
363        let events = derive_status_events(None, &status);
364        assert_eq!(reasons(&events), vec!["Reconciled"]);
365    }
366
367    #[test]
368    fn emits_recovered_when_transitioning_from_not_ready_to_ready() {
369        let mut old_status = PostgresPolicyStatus::default();
370        old_status.set_condition(ready_condition(
371            false,
372            "DatabaseConnectionFailed",
373            "database unavailable",
374        ));
375
376        let mut new_status = PostgresPolicyStatus::default();
377        new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
378
379        let events = derive_status_events(Some(&old_status), &new_status);
380        assert_eq!(reasons(&events), vec!["Recovered"]);
381    }
382
383    #[test]
384    fn emits_secret_fetch_failed_when_missing_secret_first_detected() {
385        let mut status = PostgresPolicyStatus::default();
386        status.set_condition(ready_condition(
387            false,
388            "SecretMissing",
389            "Secret \"db\" does not contain key \"DATABASE_URL\"",
390        ));
391        status.last_error = Some("Secret \"db\" does not contain key \"DATABASE_URL\"".to_string());
392
393        let events = derive_status_events(None, &status);
394        assert_eq!(reasons(&events), vec!["SecretFetchFailed"]);
395    }
396
397    #[test]
398    fn does_not_repeat_same_failure_event_without_transition() {
399        let mut old_status = PostgresPolicyStatus::default();
400        old_status.set_condition(ready_condition(
401            false,
402            "DatabaseConnectionFailed",
403            "connection refused",
404        ));
405
406        let mut new_status = PostgresPolicyStatus::default();
407        new_status.set_condition(ready_condition(
408            false,
409            "DatabaseConnectionFailed",
410            "connection refused",
411        ));
412
413        let events = derive_status_events(Some(&old_status), &new_status);
414        assert!(events.is_empty());
415    }
416
417    #[test]
418    fn emits_insufficient_privileges_on_failure_transition() {
419        let mut old_status = PostgresPolicyStatus::default();
420        old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
421
422        let mut new_status = PostgresPolicyStatus::default();
423        new_status.set_condition(ready_condition(
424            false,
425            "InsufficientPrivileges",
426            "permission denied to create role",
427        ));
428
429        let events = derive_status_events(Some(&old_status), &new_status);
430        assert_eq!(reasons(&events), vec!["InsufficientPrivileges"]);
431    }
432
433    #[test]
434    fn emits_drift_detected_for_plan_mode_with_pending_changes() {
435        let mut status = PostgresPolicyStatus::default();
436        status.set_condition(ready_condition(true, "Planned", "Plan computed"));
437        status.set_condition(drifted_condition(
438            true,
439            "DriftDetected",
440            "2 planned change(s) pending review",
441        ));
442
443        let events = derive_status_events(None, &status);
444        assert_eq!(reasons(&events), vec!["DriftDetected"]);
445    }
446
447    #[test]
448    fn emits_plan_clean_when_plan_mode_has_no_pending_changes() {
449        let mut status = PostgresPolicyStatus::default();
450        status.set_condition(ready_condition(true, "Planned", "Plan computed"));
451        status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
452
453        let events = derive_status_events(None, &status);
454        assert_eq!(reasons(&events), vec!["PlanClean"]);
455    }
456}