1use k8s_openapi::{
3 api::{
4 core::v1::ObjectReference,
5 events::v1::{Event as K8sEvent, EventSeries},
6 },
7 apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
8 jiff::{SignedDuration, Timestamp},
9};
10use kube_client::{
11 Client, ResourceExt,
12 api::{Api, Patch, PatchParams, PostParams},
13};
14use std::{
15 collections::HashMap,
16 hash::{Hash, Hasher},
17 sync::Arc,
18};
19use tokio::sync::RwLock;
20
21const CACHE_TTL: SignedDuration = SignedDuration::from_mins(6);
22
23pub struct Event {
27 pub type_: EventType,
31
32 pub reason: String,
36
37 pub note: Option<String>,
41
42 pub action: String,
48
49 pub secondary: Option<ObjectReference>,
67}
68
69#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
71pub enum EventType {
72 Normal,
74 Warning,
76}
77
78#[derive(Clone, Debug, PartialEq)]
82pub struct Reference(ObjectReference);
83
84impl Eq for Reference {}
85
86impl Hash for Reference {
87 fn hash<H: Hasher>(&self, state: &mut H) {
88 self.0.api_version.hash(state);
89 self.0.kind.hash(state);
90 self.0.name.hash(state);
91 self.0.namespace.hash(state);
92 self.0.uid.hash(state);
93 }
94}
95
96#[derive(Clone, Debug, PartialEq, Eq, Hash)]
98struct EventKey {
99 pub event_type: EventType,
100 pub action: String,
101 pub reason: String,
102 pub reporting_controller: String,
103 pub reporting_instance: Option<String>,
104 pub regarding: Reference,
105 pub related: Option<Reference>,
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Hash)]
119pub struct Reporter {
120 pub controller: String,
124
125 pub instance: Option<String>,
146}
147
148impl From<String> for Reporter {
150 fn from(es: String) -> Self {
151 Self {
152 controller: es,
153 instance: None,
154 }
155 }
156}
157
158impl From<&str> for Reporter {
159 fn from(es: &str) -> Self {
160 let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
161 Self {
162 controller: es.into(),
163 instance,
164 }
165 }
166}
167
168#[derive(Clone)]
221pub struct Recorder {
222 client: Client,
223 reporter: Reporter,
224 cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
225}
226
227impl Recorder {
228 #[must_use]
234 pub fn new(client: Client, reporter: Reporter) -> Self {
235 let cache = Arc::default();
236 Self {
237 client,
238 reporter,
239 cache,
240 }
241 }
242
243 fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
246 EventKey {
247 event_type: ev.type_,
248 action: ev.action.clone(),
249 reason: ev.reason.clone(),
250 reporting_controller: self.reporter.controller.clone(),
251 reporting_instance: self.reporter.instance.clone(),
252 regarding: Reference(regarding.clone()),
253 related: ev.secondary.clone().map(Reference),
254 }
255 }
256
257 fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
261 let now = Timestamp::now();
262 K8sEvent {
263 action: Some(ev.action.clone()),
264 reason: Some(ev.reason.clone()),
265 deprecated_count: None,
266 deprecated_first_timestamp: None,
267 deprecated_last_timestamp: None,
268 deprecated_source: None,
269 event_time: Some(MicroTime(now)),
270 regarding: Some(reference.clone()),
271 note: ev.note.clone(),
272 metadata: ObjectMeta {
273 namespace: reference.namespace.clone(),
274 name: Some(format!(
275 "{}.{:x}",
276 reference.name.as_ref().unwrap_or(&self.reporter.controller),
277 now.as_nanosecond()
278 )),
279 ..Default::default()
280 },
281 reporting_controller: Some(self.reporter.controller.clone()),
282 reporting_instance: Some(
283 self.reporter
284 .instance
285 .clone()
286 .unwrap_or_else(|| self.reporter.controller.clone()),
287 ),
288 series: None,
289 type_: match ev.type_ {
290 EventType::Normal => Some("Normal".into()),
291 EventType::Warning => Some("Warning".into()),
292 },
293 related: ev.secondary.clone(),
294 }
295 }
296
297 pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
309 let now = Timestamp::now();
310
311 self.cache.write().await.retain(|_, v| {
313 if let Some(series) = v.series.as_ref() {
314 series.last_observed_time.0 + CACHE_TTL > now
315 } else if let Some(event_time) = v.event_time.as_ref() {
316 event_time.0 + CACHE_TTL > now
317 } else {
318 true
319 }
320 });
321
322 let key = self.get_event_key(ev, reference);
323 let event = match self.cache.read().await.get(&key) {
324 Some(e) => {
325 let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
326 let series = EventSeries {
327 count,
328 last_observed_time: MicroTime(now),
329 };
330 let mut event = e.clone();
331 event.series = Some(series);
332 event
333 }
334 None => self.generate_event(ev, reference),
335 };
336
337 let events = Api::namespaced(
338 self.client.clone(),
339 reference.namespace.as_ref().unwrap_or(&"default".to_string()),
340 );
341 if event.series.is_some() {
342 events
343 .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
344 .await?;
345 } else {
346 events.create(&PostParams::default(), &event).await?;
347 }
348
349 {
350 let mut cache = self.cache.write().await;
351 cache.insert(key, event);
352 }
353 Ok(())
354 }
355}
356
357#[cfg(test)]
358mod test {
359 use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};
360
361 use k8s_openapi::{
362 api::{
363 core::v1::{ComponentStatus, Service},
364 events::v1::Event as K8sEvent,
365 },
366 apimachinery::pkg::apis::meta::v1::MicroTime,
367 jiff::{SignedDuration, Timestamp},
368 };
369 use kube::{Api, Client, Resource};
370
371 #[tokio::test]
372 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
373 async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
374 let client = Client::try_default().await?;
375
376 let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
377 let s = svcs.get("kubernetes").await?; let recorder = Recorder::new(client.clone(), "kube".into());
379 recorder
380 .publish(
381 &Event {
382 type_: EventType::Normal,
383 reason: "VeryCoolService".into(),
384 note: Some("Sending kubernetes to detention".into()),
385 action: "Test event - plz ignore".into(),
386 secondary: None,
387 },
388 &s.object_ref(&()),
389 )
390 .await?;
391 let events: Api<K8sEvent> = Api::namespaced(client, "default");
392
393 let event_list = events.list(&Default::default()).await?;
394 let found_event = event_list
395 .into_iter()
396 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
397 .unwrap();
398 assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
399
400 recorder
401 .publish(
402 &Event {
403 type_: EventType::Normal,
404 reason: "VeryCoolService".into(),
405 note: Some("Sending kubernetes to detention twice".into()),
406 action: "Test event - plz ignore".into(),
407 secondary: None,
408 },
409 &s.object_ref(&()),
410 )
411 .await?;
412
413 let event_list = events.list(&Default::default()).await?;
414 let found_event = event_list
415 .into_iter()
416 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
417 .unwrap();
418 assert!(found_event.series.is_some());
419
420 Ok(())
421 }
422
423 #[tokio::test]
424 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
425 async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
426 let client = Client::try_default().await?;
427
428 let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
429 let s = component_status_api.get("scheduler").await?;
430 let recorder = Recorder::new(client.clone(), "kube".into());
431 recorder
432 .publish(
433 &Event {
434 type_: EventType::Normal,
435 reason: "VeryCoolServiceNoNamespace".into(),
436 note: Some("Sending kubernetes to detention without namespace".into()),
437 action: "Test event - plz ignore".into(),
438 secondary: None,
439 },
440 &s.object_ref(&()),
441 )
442 .await?;
443 let events: Api<K8sEvent> = Api::namespaced(client, "default");
444
445 let event_list = events.list(&Default::default()).await?;
446 let found_event = event_list
447 .into_iter()
448 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
449 .unwrap();
450 assert_eq!(
451 found_event.note.unwrap(),
452 "Sending kubernetes to detention without namespace"
453 );
454
455 recorder
456 .publish(
457 &Event {
458 type_: EventType::Normal,
459 reason: "VeryCoolServiceNoNamespace".into(),
460 note: Some("Sending kubernetes to detention without namespace twice".into()),
461 action: "Test event - plz ignore".into(),
462 secondary: None,
463 },
464 &s.object_ref(&()),
465 )
466 .await?;
467
468 let event_list = events.list(&Default::default()).await?;
469 let found_event = event_list
470 .into_iter()
471 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
472 .unwrap();
473 assert!(found_event.series.is_some());
474 Ok(())
475 }
476
477 #[tokio::test]
478 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
479 async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
480 let client = Client::try_default().await?;
481
482 let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
483 let s = svcs.get("kubernetes").await?; let reference = s.object_ref(&());
486 let reporter: Reporter = "kube".into();
487 let ev = Event {
488 type_: EventType::Normal,
489 reason: "TestCacheTtl".into(),
490 note: Some("Sending kubernetes to detention".into()),
491 action: "Test event - plz ignore".into(),
492 secondary: None,
493 };
494 let key = EventKey {
495 event_type: ev.type_,
496 action: ev.action.clone(),
497 reason: ev.reason.clone(),
498 reporting_controller: reporter.controller.clone(),
499 regarding: Reference(reference.clone()),
500 reporting_instance: None,
501 related: None,
502 };
503
504 let reporter = Reporter {
505 controller: "kube".into(),
506 instance: None,
507 };
508 let recorder = Recorder::new(client.clone(), reporter);
509
510 recorder.publish(&ev, &s.object_ref(&())).await?;
511 let now = Timestamp::now();
512 let past = now - SignedDuration::from_mins(10);
513 recorder.cache.write().await.entry(key).and_modify(|e| {
514 e.event_time = Some(MicroTime(past));
515 });
516
517 recorder.publish(&ev, &s.object_ref(&())).await?;
518
519 let events: Api<K8sEvent> = Api::namespaced(client, "default");
520 let event_list = events.list(&Default::default()).await?;
521 let found_event = event_list
522 .into_iter()
523 .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
524 .unwrap();
525 assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
526 assert!(found_event.series.is_none());
527
528 Ok(())
529 }
530}