k8s_operator_controller/
events.rs

1use k8s_openapi::api::core::v1::Event;
2use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
3use k8s_openapi::chrono::Utc;
4use kube::api::{ObjectMeta, PostParams, Resource};
5use kube::{Api, Client};
6use tracing::debug;
7
8use k8s_operator_core::OperatorError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum EventType {
12    Normal,
13    Warning,
14}
15
16impl EventType {
17    fn as_str(&self) -> &'static str {
18        match self {
19            EventType::Normal => "Normal",
20            EventType::Warning => "Warning",
21        }
22    }
23}
24
25pub struct EventRecorder {
26    client: Client,
27    component: String,
28    namespace: String,
29}
30
31impl EventRecorder {
32    pub fn new(client: Client, component: impl Into<String>, namespace: impl Into<String>) -> Self {
33        Self {
34            client,
35            component: component.into(),
36            namespace: namespace.into(),
37        }
38    }
39
40    pub async fn record<K>(
41        &self,
42        resource: &K,
43        event_type: EventType,
44        reason: impl Into<String>,
45        message: impl Into<String>,
46    ) -> k8s_operator_core::Result<()>
47    where
48        K: Resource,
49        K::DynamicType: Default,
50    {
51        let reason = reason.into();
52        let message = message.into();
53
54        let name = resource.meta().name.clone().unwrap_or_default();
55        let uid = resource.meta().uid.clone().unwrap_or_default();
56
57        let event_name = format!(
58            "{}.{}.{}",
59            name,
60            self.component.replace('/', "-"),
61            Utc::now().timestamp_millis()
62        );
63
64        let event = Event {
65            metadata: ObjectMeta {
66                name: Some(event_name),
67                namespace: Some(self.namespace.clone()),
68                ..Default::default()
69            },
70            involved_object: k8s_openapi::api::core::v1::ObjectReference {
71                api_version: Some(K::api_version(&K::DynamicType::default()).to_string()),
72                kind: Some(K::kind(&K::DynamicType::default()).to_string()),
73                name: Some(name.clone()),
74                namespace: Some(self.namespace.clone()),
75                uid: Some(uid),
76                ..Default::default()
77            },
78            reason: Some(reason),
79            message: Some(message),
80            type_: Some(event_type.as_str().to_string()),
81            first_timestamp: Some(Time(Utc::now())),
82            last_timestamp: Some(Time(Utc::now())),
83            count: Some(1),
84            reporting_component: Some(self.component.clone()),
85            reporting_instance: Some(self.component.clone()),
86            ..Default::default()
87        };
88
89        let events: Api<Event> = Api::namespaced(self.client.clone(), &self.namespace);
90        events
91            .create(&PostParams::default(), &event)
92            .await
93            .map_err(|e| OperatorError::KubeError(e))?;
94
95        debug!(
96            "Recorded event: {} - {} for {}",
97            event_type.as_str(),
98            event.reason.unwrap_or_default(),
99            name
100        );
101
102        Ok(())
103    }
104
105    pub async fn normal<K>(
106        &self,
107        resource: &K,
108        reason: impl Into<String>,
109        message: impl Into<String>,
110    ) -> k8s_operator_core::Result<()>
111    where
112        K: Resource,
113        K::DynamicType: Default,
114    {
115        self.record(resource, EventType::Normal, reason, message)
116            .await
117    }
118
119    pub async fn warning<K>(
120        &self,
121        resource: &K,
122        reason: impl Into<String>,
123        message: impl Into<String>,
124    ) -> k8s_operator_core::Result<()>
125    where
126        K: Resource,
127        K::DynamicType: Default,
128    {
129        self.record(resource, EventType::Warning, reason, message)
130            .await
131    }
132}