k8s_operator_controller/
events.rs1use k8s_openapi::api::core::v1::Event;
2use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
3use k8s_openapi::chrono::Utc;
4use kube::api::{ObjectMeta, PostParams, Resource};
5use kube::{Api, Client};
6use tracing::debug;
7
8use k8s_operator_core::OperatorError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum EventType {
12 Normal,
13 Warning,
14}
15
16impl EventType {
17 fn as_str(&self) -> &'static str {
18 match self {
19 EventType::Normal => "Normal",
20 EventType::Warning => "Warning",
21 }
22 }
23}
24
25pub struct EventRecorder {
26 client: Client,
27 component: String,
28 namespace: String,
29}
30
31impl EventRecorder {
32 pub fn new(client: Client, component: impl Into<String>, namespace: impl Into<String>) -> Self {
33 Self {
34 client,
35 component: component.into(),
36 namespace: namespace.into(),
37 }
38 }
39
40 pub async fn record<K>(
41 &self,
42 resource: &K,
43 event_type: EventType,
44 reason: impl Into<String>,
45 message: impl Into<String>,
46 ) -> k8s_operator_core::Result<()>
47 where
48 K: Resource,
49 K::DynamicType: Default,
50 {
51 let reason = reason.into();
52 let message = message.into();
53
54 let name = resource.meta().name.clone().unwrap_or_default();
55 let uid = resource.meta().uid.clone().unwrap_or_default();
56
57 let event_name = format!(
58 "{}.{}.{}",
59 name,
60 self.component.replace('/', "-"),
61 Utc::now().timestamp_millis()
62 );
63
64 let event = Event {
65 metadata: ObjectMeta {
66 name: Some(event_name),
67 namespace: Some(self.namespace.clone()),
68 ..Default::default()
69 },
70 involved_object: k8s_openapi::api::core::v1::ObjectReference {
71 api_version: Some(K::api_version(&K::DynamicType::default()).to_string()),
72 kind: Some(K::kind(&K::DynamicType::default()).to_string()),
73 name: Some(name.clone()),
74 namespace: Some(self.namespace.clone()),
75 uid: Some(uid),
76 ..Default::default()
77 },
78 reason: Some(reason),
79 message: Some(message),
80 type_: Some(event_type.as_str().to_string()),
81 first_timestamp: Some(Time(Utc::now())),
82 last_timestamp: Some(Time(Utc::now())),
83 count: Some(1),
84 reporting_component: Some(self.component.clone()),
85 reporting_instance: Some(self.component.clone()),
86 ..Default::default()
87 };
88
89 let events: Api<Event> = Api::namespaced(self.client.clone(), &self.namespace);
90 events
91 .create(&PostParams::default(), &event)
92 .await
93 .map_err(|e| OperatorError::KubeError(e))?;
94
95 debug!(
96 "Recorded event: {} - {} for {}",
97 event_type.as_str(),
98 event.reason.unwrap_or_default(),
99 name
100 );
101
102 Ok(())
103 }
104
105 pub async fn normal<K>(
106 &self,
107 resource: &K,
108 reason: impl Into<String>,
109 message: impl Into<String>,
110 ) -> k8s_operator_core::Result<()>
111 where
112 K: Resource,
113 K::DynamicType: Default,
114 {
115 self.record(resource, EventType::Normal, reason, message)
116 .await
117 }
118
119 pub async fn warning<K>(
120 &self,
121 resource: &K,
122 reason: impl Into<String>,
123 message: impl Into<String>,
124 ) -> k8s_operator_core::Result<()>
125 where
126 K: Resource,
127 K::DynamicType: Default,
128 {
129 self.record(resource, EventType::Warning, reason, message)
130 .await
131 }
132}