Skip to main content

pgroles_operator/
events.rs

1//! Transition-based Kubernetes Events for `PostgresPolicy` resources.
2//!
3//! Events complement status conditions:
4//! - status remains the source of truth for the current state
5//! - Events surface notable transitions in `kubectl describe`
6//! - OTLP metrics remain the fleet-level observability path
7
8use k8s_openapi::api::core::v1::ObjectReference;
9use kube::Resource;
10use kube::runtime::events::{Event, EventType, Recorder};
11
12use crate::crd::{PolicyCondition, PostgresPolicy, PostgresPolicyPlan, PostgresPolicyStatus};
13
14/// Publish Kubernetes Events for notable status transitions.
15pub async fn publish_status_events(
16    recorder: &Recorder,
17    resource: &PostgresPolicy,
18    old_status: Option<&PostgresPolicyStatus>,
19    new_status: &PostgresPolicyStatus,
20) -> Result<(), kube::Error> {
21    let reference: ObjectReference = resource.object_ref(&());
22    for event in derive_status_events(old_status, new_status) {
23        recorder.publish(&event, &reference).await?;
24    }
25    Ok(())
26}
27
28/// Publish a plan lifecycle event on the parent policy.
29pub async fn publish_plan_event(
30    recorder: &Recorder,
31    policy: &PostgresPolicy,
32    plan: &PostgresPolicyPlan,
33    event_type: PlanEventType,
34) -> Result<(), kube::Error> {
35    let reference: ObjectReference = policy.object_ref(&());
36    let plan_name = kube::ResourceExt::name_any(plan);
37    let event = match event_type {
38        PlanEventType::Created { change_count } => event(
39            EventType::Normal,
40            "PlanCreated",
41            "PlanLifecycle",
42            format!("Plan {plan_name} created with {change_count} change(s)"),
43        ),
44        PlanEventType::Approved => event(
45            EventType::Normal,
46            "PlanApproved",
47            "PlanLifecycle",
48            format!("Plan {plan_name} approved"),
49        ),
50        PlanEventType::Rejected => event(
51            EventType::Normal,
52            "PlanRejected",
53            "PlanLifecycle",
54            format!("Plan {plan_name} rejected"),
55        ),
56        PlanEventType::ApplyStarted => event(
57            EventType::Normal,
58            "ApplyStarted",
59            "PlanLifecycle",
60            format!("Executing plan {plan_name}"),
61        ),
62        PlanEventType::ApplySucceeded => event(
63            EventType::Normal,
64            "ApplySucceeded",
65            "PlanLifecycle",
66            format!("Plan {plan_name} applied successfully"),
67        ),
68        PlanEventType::ApplyFailed { error } => event(
69            EventType::Warning,
70            "ApplyFailed",
71            "PlanLifecycle",
72            format!("Plan {plan_name} failed: {error}"),
73        ),
74    };
75    recorder.publish(&event, &reference).await
76}
77
78/// Types of plan lifecycle events.
79pub enum PlanEventType {
80    /// A new plan was computed.
81    Created { change_count: i32 },
82    /// Approval annotation detected on a plan.
83    Approved,
84    /// Rejection annotation detected on a plan.
85    Rejected,
86    /// Plan execution has started.
87    ApplyStarted,
88    /// Plan execution completed successfully.
89    ApplySucceeded,
90    /// Plan execution failed.
91    ApplyFailed { error: String },
92}
93
94fn derive_status_events(
95    old_status: Option<&PostgresPolicyStatus>,
96    new_status: &PostgresPolicyStatus,
97) -> Vec<Event> {
98    let mut events = Vec::new();
99
100    if transitioned_to_true(old_status, new_status, "Conflict") {
101        let note = condition_message(new_status, "Conflict")
102            .or_else(|| new_status.last_error.clone())
103            .unwrap_or_else(|| "Policy ownership conflict detected".to_string());
104        events.push(event(
105            EventType::Warning,
106            "ConflictDetected",
107            "StatusTransition",
108            note,
109        ));
110    }
111
112    if transitioned_from_true(old_status, new_status, "Conflict") {
113        events.push(event(
114            EventType::Normal,
115            "ConflictResolved",
116            "StatusTransition",
117            "Policy ownership conflict resolved".to_string(),
118        ));
119    }
120
121    if transitioned_to_true(old_status, new_status, "Paused") {
122        let note = condition_message(new_status, "Paused")
123            .unwrap_or_else(|| "Reconciliation suspended by spec".to_string());
124        events.push(event(
125            EventType::Normal,
126            "Suspended",
127            "StatusTransition",
128            note,
129        ));
130    }
131
132    if transitioned_to_true(old_status, new_status, "Drifted") {
133        let note = condition_message(new_status, "Drifted")
134            .unwrap_or_else(|| "Planned changes are pending review".to_string());
135        events.push(event(
136            EventType::Normal,
137            "DriftDetected",
138            "StatusTransition",
139            note,
140        ));
141    }
142
143    if plan_became_clean(old_status, new_status) {
144        let note = condition_message(new_status, "Drifted")
145            .unwrap_or_else(|| "Plan computed; database already matches desired state".to_string());
146        events.push(event(
147            EventType::Normal,
148            "PlanClean",
149            "StatusTransition",
150            note,
151        ));
152    }
153
154    if ready_became_true(old_status, new_status) && !is_planned_ready(new_status) {
155        let reason = if had_ready_condition(old_status) {
156            "Recovered"
157        } else {
158            "Reconciled"
159        };
160        let note = condition_message(new_status, "Ready")
161            .unwrap_or_else(|| "Policy reconciled successfully".to_string());
162        events.push(event(EventType::Normal, reason, "StatusTransition", note));
163    }
164
165    if let Some(reason) = noteworthy_failure_reason(old_status, new_status) {
166        let note = condition_message(new_status, "Ready")
167            .or_else(|| new_status.last_error.clone())
168            .unwrap_or_else(|| format!("Policy entered {reason} state"));
169        events.push(event(EventType::Warning, reason, "StatusTransition", note));
170    }
171
172    events
173}
174
175fn event(type_: EventType, reason: &str, action: &str, note: String) -> Event {
176    Event {
177        type_,
178        reason: reason.to_string(),
179        note: Some(note),
180        action: action.to_string(),
181        secondary: None,
182    }
183}
184
185fn condition<'a>(
186    status: &'a PostgresPolicyStatus,
187    condition_type: &str,
188) -> Option<&'a PolicyCondition> {
189    status
190        .conditions
191        .iter()
192        .find(|condition| condition.condition_type == condition_type)
193}
194
195fn condition_status<'a>(
196    status: Option<&'a PostgresPolicyStatus>,
197    condition_type: &str,
198) -> Option<&'a str> {
199    status
200        .and_then(|status| condition(status, condition_type))
201        .map(|condition| condition.status.as_str())
202}
203
204fn condition_reason<'a>(
205    status: Option<&'a PostgresPolicyStatus>,
206    condition_type: &str,
207) -> Option<&'a str> {
208    status
209        .and_then(|status| condition(status, condition_type))
210        .and_then(|condition| condition.reason.as_deref())
211}
212
213fn condition_message(status: &PostgresPolicyStatus, condition_type: &str) -> Option<String> {
214    condition(status, condition_type).and_then(|condition| condition.message.clone())
215}
216
217fn condition_is_true(status: Option<&PostgresPolicyStatus>, condition_type: &str) -> bool {
218    condition_status(status, condition_type) == Some("True")
219}
220
221fn transitioned_to_true(
222    old_status: Option<&PostgresPolicyStatus>,
223    new_status: &PostgresPolicyStatus,
224    condition_type: &str,
225) -> bool {
226    !condition_is_true(old_status, condition_type)
227        && condition_is_true(Some(new_status), condition_type)
228}
229
230fn transitioned_from_true(
231    old_status: Option<&PostgresPolicyStatus>,
232    new_status: &PostgresPolicyStatus,
233    condition_type: &str,
234) -> bool {
235    condition_is_true(old_status, condition_type)
236        && !condition_is_true(Some(new_status), condition_type)
237}
238
239fn was_ready(old_status: Option<&PostgresPolicyStatus>) -> bool {
240    condition_is_true(old_status, "Ready")
241}
242
243fn had_ready_condition(old_status: Option<&PostgresPolicyStatus>) -> bool {
244    old_status
245        .and_then(|status| condition(status, "Ready"))
246        .is_some()
247}
248
249fn ready_became_true(
250    old_status: Option<&PostgresPolicyStatus>,
251    new_status: &PostgresPolicyStatus,
252) -> bool {
253    !was_ready(old_status) && condition_is_true(Some(new_status), "Ready")
254}
255
256fn is_planned_ready(status: &PostgresPolicyStatus) -> bool {
257    condition(status, "Ready").and_then(|ready| ready.reason.as_deref()) == Some("Planned")
258}
259
260fn plan_became_clean(
261    old_status: Option<&PostgresPolicyStatus>,
262    new_status: &PostgresPolicyStatus,
263) -> bool {
264    if !is_planned_ready(new_status) || condition_is_true(Some(new_status), "Drifted") {
265        return false;
266    }
267
268    !is_planned_ready_status(old_status) || condition_is_true(old_status, "Drifted")
269}
270
271fn is_planned_ready_status(status: Option<&PostgresPolicyStatus>) -> bool {
272    status.map(is_planned_ready).unwrap_or(false)
273}
274
275fn noteworthy_failure_reason(
276    old_status: Option<&PostgresPolicyStatus>,
277    new_status: &PostgresPolicyStatus,
278) -> Option<&'static str> {
279    let ready = condition(new_status, "Ready")?;
280    if ready.status != "False" {
281        return None;
282    }
283
284    let reason = ready.reason.as_deref()?;
285    if matches!(reason, "ConflictingPolicy" | "Suspended") {
286        return None;
287    }
288
289    let mapped_reason = match reason {
290        "InvalidSpec" => "InvalidSpec",
291        "SecretMissing" | "SecretFetchFailed" => "SecretFetchFailed",
292        "DatabaseConnectionFailed" => "DatabaseConnectionFailed",
293        "GcpAuthFailed" => "GcpAuthFailed",
294        "InsufficientPrivileges" => "InsufficientPrivileges",
295        "UnsafeRoleDrops" => "UnsafeRoleDropsBlocked",
296        _ => return None,
297    };
298
299    let old_ready_status = condition_status(old_status, "Ready");
300    let old_ready_reason = condition_reason(old_status, "Ready");
301
302    if old_ready_status == Some("False") && old_ready_reason == Some(reason) {
303        None
304    } else {
305        Some(mapped_reason)
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::crd::{
313        PostgresPolicyStatus, conflict_condition, drifted_condition, paused_condition,
314        ready_condition,
315    };
316
317    fn reasons(events: &[Event]) -> Vec<&str> {
318        events.iter().map(|event| event.reason.as_str()).collect()
319    }
320
321    #[test]
322    fn emits_conflict_detected_when_conflict_condition_becomes_true() {
323        let mut status = PostgresPolicyStatus::default();
324        status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
325        status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
326        status.last_error = Some("overlap".to_string());
327
328        let events = derive_status_events(None, &status);
329        assert_eq!(reasons(&events), vec!["ConflictDetected"]);
330    }
331
332    #[test]
333    fn emits_conflict_resolved_and_recovered_when_policy_recovers_from_conflict() {
334        let mut old_status = PostgresPolicyStatus::default();
335        old_status.set_condition(ready_condition(false, "ConflictingPolicy", "overlap"));
336        old_status.set_condition(conflict_condition("ConflictingPolicy", "overlap"));
337
338        let mut new_status = PostgresPolicyStatus::default();
339        new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
340
341        let events = derive_status_events(Some(&old_status), &new_status);
342        assert_eq!(reasons(&events), vec!["ConflictResolved", "Recovered"]);
343    }
344
345    #[test]
346    fn emits_suspended_when_policy_is_paused() {
347        let mut status = PostgresPolicyStatus::default();
348        status.set_condition(paused_condition("Reconciliation suspended by spec"));
349        status.set_condition(ready_condition(
350            false,
351            "Suspended",
352            "Reconciliation suspended by spec",
353        ));
354
355        let events = derive_status_events(None, &status);
356        assert_eq!(reasons(&events), vec!["Suspended"]);
357    }
358
359    #[test]
360    fn emits_reconciled_on_first_success() {
361        let mut status = PostgresPolicyStatus::default();
362        status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
363
364        let events = derive_status_events(None, &status);
365        assert_eq!(reasons(&events), vec!["Reconciled"]);
366    }
367
368    #[test]
369    fn emits_recovered_when_transitioning_from_not_ready_to_ready() {
370        let mut old_status = PostgresPolicyStatus::default();
371        old_status.set_condition(ready_condition(
372            false,
373            "DatabaseConnectionFailed",
374            "database unavailable",
375        ));
376
377        let mut new_status = PostgresPolicyStatus::default();
378        new_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
379
380        let events = derive_status_events(Some(&old_status), &new_status);
381        assert_eq!(reasons(&events), vec!["Recovered"]);
382    }
383
384    #[test]
385    fn emits_secret_fetch_failed_when_missing_secret_first_detected() {
386        let mut status = PostgresPolicyStatus::default();
387        status.set_condition(ready_condition(
388            false,
389            "SecretMissing",
390            "Secret \"db\" does not contain key \"DATABASE_URL\"",
391        ));
392        status.last_error = Some("Secret \"db\" does not contain key \"DATABASE_URL\"".to_string());
393
394        let events = derive_status_events(None, &status);
395        assert_eq!(reasons(&events), vec!["SecretFetchFailed"]);
396    }
397
398    #[test]
399    fn does_not_repeat_same_failure_event_without_transition() {
400        let mut old_status = PostgresPolicyStatus::default();
401        old_status.set_condition(ready_condition(
402            false,
403            "DatabaseConnectionFailed",
404            "connection refused",
405        ));
406
407        let mut new_status = PostgresPolicyStatus::default();
408        new_status.set_condition(ready_condition(
409            false,
410            "DatabaseConnectionFailed",
411            "connection refused",
412        ));
413
414        let events = derive_status_events(Some(&old_status), &new_status);
415        assert!(events.is_empty());
416    }
417
418    #[test]
419    fn emits_insufficient_privileges_on_failure_transition() {
420        let mut old_status = PostgresPolicyStatus::default();
421        old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
422
423        let mut new_status = PostgresPolicyStatus::default();
424        new_status.set_condition(ready_condition(
425            false,
426            "InsufficientPrivileges",
427            "permission denied to create role",
428        ));
429
430        let events = derive_status_events(Some(&old_status), &new_status);
431        assert_eq!(reasons(&events), vec!["InsufficientPrivileges"]);
432    }
433
434    #[test]
435    fn emits_gcp_auth_failed_on_failure_transition() {
436        let mut old_status = PostgresPolicyStatus::default();
437        old_status.set_condition(ready_condition(true, "Reconciled", "All changes applied"));
438
439        let mut new_status = PostgresPolicyStatus::default();
440        new_status.set_condition(ready_condition(
441            false,
442            "GcpAuthFailed",
443            "token request rejected",
444        ));
445
446        let events = derive_status_events(Some(&old_status), &new_status);
447        assert_eq!(reasons(&events), vec!["GcpAuthFailed"]);
448        assert!(matches!(events[0].type_, EventType::Warning));
449        assert_eq!(events[0].note.as_deref(), Some("token request rejected"));
450    }
451
452    #[test]
453    fn emits_drift_detected_for_plan_mode_with_pending_changes() {
454        let mut status = PostgresPolicyStatus::default();
455        status.set_condition(ready_condition(true, "Planned", "Plan computed"));
456        status.set_condition(drifted_condition(
457            true,
458            "DriftDetected",
459            "2 planned change(s) pending review",
460        ));
461
462        let events = derive_status_events(None, &status);
463        assert_eq!(reasons(&events), vec!["DriftDetected"]);
464    }
465
466    #[test]
467    fn emits_plan_clean_when_plan_mode_has_no_pending_changes() {
468        let mut status = PostgresPolicyStatus::default();
469        status.set_condition(ready_condition(true, "Planned", "Plan computed"));
470        status.set_condition(drifted_condition(false, "InSync", "No pending changes"));
471
472        let events = derive_status_events(None, &status);
473        assert_eq!(reasons(&events), vec!["PlanClean"]);
474    }
475}