use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event as CoreEvent},
apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
chrono::Utc,
};
use kube_client::{
api::{Api, PostParams},
Client,
};
pub struct Event {
pub type_: EventType,
pub reason: String,
pub note: Option<String>,
pub action: String,
pub secondary: Option<ObjectReference>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum EventType {
Normal,
Warning,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Reporter {
pub controller: String,
pub instance: Option<String>,
}
impl From<String> for Reporter {
fn from(es: String) -> Self {
Self {
controller: es,
instance: None,
}
}
}
impl From<&str> for Reporter {
fn from(es: &str) -> Self {
Self {
controller: es.into(),
instance: None,
}
}
}
#[derive(Clone)]
pub struct Recorder {
events: Api<CoreEvent>,
reporter: Reporter,
reference: ObjectReference,
}
impl Recorder {
#[must_use]
pub fn new(client: Client, reporter: Reporter, reference: ObjectReference) -> Self {
let default_namespace = "kube-system".to_owned(); let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace));
Self {
events,
reporter,
reference,
}
}
pub async fn publish(&self, ev: Event) -> Result<(), kube_client::Error> {
self.events
.create(&PostParams::default(), &CoreEvent {
action: Some(ev.action),
reason: Some(ev.reason),
deprecated_count: None,
deprecated_first_timestamp: None,
deprecated_last_timestamp: None,
deprecated_source: None,
event_time: MicroTime(Utc::now()),
regarding: Some(self.reference.clone()),
note: ev.note.map(Into::into),
metadata: ObjectMeta {
namespace: self.reference.namespace.clone(),
generate_name: Some(format!("{}-", self.reporter.controller)),
..Default::default()
},
reporting_controller: Some(self.reporter.controller.clone()),
reporting_instance: Some(
self.reporter
.instance
.clone()
.unwrap_or_else(|| self.reporter.controller.clone()),
),
series: None,
type_: match ev.type_ {
EventType::Normal => Some("Normal".into()),
EventType::Warning => Some("Warning".into()),
},
related: ev.secondary,
})
.await?;
Ok(())
}
}
#[cfg(test)]
mod test {
#![allow(unused_imports)]
use k8s_openapi::api::{
core::v1::{Event as CoreEvent, Service},
rbac::v1::ClusterRole,
};
use kube_client::{Api, Client, Resource};
use super::{Event, EventType, Recorder};
#[tokio::test]
#[ignore] async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
let s = svcs.get("kubernetes").await?; let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.await?;
let events: Api<CoreEvent> = Api::namespaced(client, "default");
let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
.unwrap();
assert_eq!(found_event.message.unwrap(), "Sending kubernetes to detention");
Ok(())
}
#[tokio::test]
#[ignore] async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let svcs: Api<ClusterRole> = Api::all(client.clone());
let s = svcs.get("system:basic-user").await?; let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.await?;
let events: Api<CoreEvent> = Api::namespaced(client, "kube-system");
let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
.unwrap();
assert_eq!(
found_event.message.unwrap(),
"Sending kubernetes to detention without namespace"
);
Ok(())
}
}