pub mod types;
mod recorder;
pub use recorder::EventRecorder;
pub use types::EventData;
pub use types::EventType;
pub use types::Reason;
use async_trait::async_trait;
use kube::Resource;
use crate::error::Result;
use crate::TryResource;
#[async_trait]
pub trait EmitEvent<R>: Send + Sync
where
R: Reason,
{
async fn try_emit<K>(&self, object: &K, event: EventData<R>) -> Result<()>
where
K: Resource<DynamicType = ()> + TryResource + Clone + Send + Sync;
async fn emit<K>(&self, object: &K, event: EventData<R>)
where
K: Resource<DynamicType = ()> + TryResource + Clone + Send + Sync,
{
let reason = event.reason.to_owned();
if let Err(e) = self.try_emit(object, event).await {
tracing::warn!(
error = %e,
reason = %reason,
"Failed to emit event"
);
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::error::Error;
use crate::events::types::EventType;
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use std::sync::Arc;
use std::sync::Mutex;
use strum::AsRefStr;
use strum::Display;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRefStr)]
pub enum TestEventReason {
ResourceCreated,
ResourceUpdated,
ResourceDeleted,
ReconciliationFailed,
}
impl Reason for TestEventReason {}
#[derive(Debug, Clone)]
pub struct RecordedEvent<R: Reason> {
pub resource_name: String,
pub reason: R,
pub message: String,
pub count: i32,
}
#[derive(Clone)]
pub struct MockEventRecorder<R: Reason> {
events: Arc<Mutex<Vec<RecordedEvent<R>>>>,
}
impl<R: Reason> Default for MockEventRecorder<R> {
fn default() -> Self {
Self::new()
}
}
impl<R: Reason> MockEventRecorder<R> {
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn events(&self) -> Vec<RecordedEvent<R>> {
self.events.lock().unwrap().clone()
}
pub fn event_count(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn clear(&self) {
self.events.lock().unwrap().clear();
}
}
#[async_trait]
impl<R: Reason> EmitEvent<R> for MockEventRecorder<R> {
async fn try_emit<K>(&self, object: &K, event: EventData<R>) -> Result<()>
where
K: Resource<DynamicType = ()> + TryResource + Clone + Send + Sync,
{
let name = object.try_name()?.to_owned();
let reason_str = event.reason.as_ref().to_owned();
let mut events = self.events.lock().unwrap();
if let Some(existing) = events
.iter_mut()
.find(|e| e.resource_name == name && e.reason.as_ref() == reason_str)
{
existing.count += 1;
existing.message = event.message;
return Ok(());
}
events.push(RecordedEvent {
resource_name: name,
reason: event.reason,
message: event.message,
count: 1,
});
Ok(())
}
}
struct FailingMockEventRecorder<R: Reason> {
should_fail: bool,
events: Arc<Mutex<Vec<RecordedEvent<R>>>>,
}
impl<R: Reason> FailingMockEventRecorder<R> {
fn new_succeeding() -> Self {
Self {
should_fail: false,
events: Arc::new(Mutex::new(Vec::new())),
}
}
fn new_failing() -> Self {
Self {
should_fail: true,
events: Arc::new(Mutex::new(Vec::new())),
}
}
fn events(&self) -> Vec<RecordedEvent<R>> {
self.events.lock().unwrap().clone()
}
}
#[async_trait]
impl<R: Reason> EmitEvent<R> for FailingMockEventRecorder<R> {
async fn try_emit<K>(&self, object: &K, event: EventData<R>) -> Result<()>
where
K: Resource<DynamicType = ()> + TryResource + Clone + Send + Sync,
{
if self.should_fail {
return Err(Error::EmitEventFailed("Simulated failure".to_string()));
}
let name = object.try_name()?.to_owned();
let reason_str = event.reason.as_ref().to_owned();
let mut events = self.events.lock().unwrap();
if let Some(existing) = events
.iter_mut()
.find(|e| e.resource_name == name && e.reason.as_ref() == reason_str)
{
existing.count += 1;
existing.message = event.message;
return Ok(());
}
events.push(RecordedEvent {
resource_name: name,
reason: event.reason,
message: event.message,
count: 1,
});
Ok(())
}
}
fn create_test_resource() -> ConfigMap {
ConfigMap {
metadata: ObjectMeta {
name: Some("test-resource".to_string()),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
}
}
#[tokio::test]
async fn test_mock_event_recorder() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.try_emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Resource was created"),
)
.await
.unwrap();
let events = recorder.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].resource_name, "test-resource");
assert_eq!(events[0].reason, TestEventReason::ResourceCreated);
assert_eq!(events[0].message, "Resource was created");
assert_eq!(events[0].count, 1);
}
#[test]
fn test_normal_event_creation() {
let reason = TestEventReason::ResourceCreated;
let message = "Resource has been created successfully";
let event = EventData::normal(reason, message);
assert_eq!(event.type_, EventType::Normal);
assert_eq!(event.reason, TestEventReason::ResourceCreated);
assert_eq!(event.message, "Resource has been created successfully");
assert_eq!(event.action, None);
}
#[test]
fn test_warning_event_creation() {
let reason = TestEventReason::ReconciliationFailed;
let message = "Failed to reconcile resource configuration";
let event = EventData::warning(reason, message);
assert_eq!(event.type_, EventType::Warning);
assert_eq!(event.reason, TestEventReason::ReconciliationFailed);
assert_eq!(event.message, "Failed to reconcile resource configuration");
assert_eq!(event.action, None);
}
#[test]
fn test_event_with_action() {
let event = EventData::normal(TestEventReason::ResourceCreated, "Resource created");
let event = event.with_action("CreateResource");
assert_eq!(event.action, Some("CreateResource".to_string()));
}
#[test]
fn test_event_message_accepts_string() {
let message = String::from("Dynamic message");
let event = EventData::normal(TestEventReason::ResourceCreated, message);
assert_eq!(event.message, "Dynamic message");
}
#[tokio::test]
async fn test_try_emit_success_returns_ok() {
let recorder = FailingMockEventRecorder::new_succeeding();
let resource = create_test_resource();
let result = recorder
.try_emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Test message"),
)
.await;
assert!(result.is_ok());
let events = recorder.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].resource_name, "test-resource");
assert_eq!(events[0].message, "Test message");
}
#[tokio::test]
async fn test_try_emit_failure_returns_error() {
let recorder = FailingMockEventRecorder::new_failing();
let resource = create_test_resource();
let result = recorder
.try_emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Test message"),
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Simulated failure"));
}
#[tokio::test]
async fn test_emit_success_completes_without_error() {
let recorder = FailingMockEventRecorder::new_succeeding();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Test message"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].resource_name, "test-resource");
}
#[tokio::test]
async fn test_emit_failure_swallows_error_and_continues() {
let recorder = FailingMockEventRecorder::new_failing();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Test message"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 0);
}
#[tokio::test]
async fn test_emit_calls_try_emit_internally() {
let recorder = FailingMockEventRecorder::new_succeeding();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceUpdated, "Update message"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].reason, TestEventReason::ResourceUpdated);
assert_eq!(events[0].message, "Update message");
}
#[tokio::test]
async fn test_multiple_different_events_are_recorded() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Created"),
)
.await;
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceUpdated, "Updated"),
)
.await;
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceDeleted, "Deleted"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 3);
assert_eq!(events[0].reason, TestEventReason::ResourceCreated);
assert_eq!(events[1].reason, TestEventReason::ResourceUpdated);
assert_eq!(events[2].reason, TestEventReason::ResourceDeleted);
}
#[tokio::test]
async fn test_clear_removes_all_events() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Created"),
)
.await;
assert_eq!(recorder.events().len(), 1);
recorder.clear();
assert_eq!(recorder.events().len(), 0);
}
#[tokio::test]
async fn test_duplicate_events_are_deduplicated() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "First message"),
)
.await;
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Second message"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].reason, TestEventReason::ResourceCreated);
assert_eq!(events[0].message, "Second message");
assert_eq!(events[0].count, 2);
}
#[tokio::test]
async fn test_different_reasons_not_deduplicated() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceCreated, "Created"),
)
.await;
recorder
.emit(
&resource,
EventData::warning(TestEventReason::ReconciliationFailed, "Failed"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].count, 1);
assert_eq!(events[1].count, 1);
}
#[tokio::test]
async fn test_different_resources_not_deduplicated() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource1 = ConfigMap {
metadata: ObjectMeta {
name: Some("resource-1".to_string()),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
let resource2 = ConfigMap {
metadata: ObjectMeta {
name: Some("resource-2".to_string()),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
recorder
.emit(
&resource1,
EventData::normal(TestEventReason::ResourceCreated, "Created 1"),
)
.await;
recorder
.emit(
&resource2,
EventData::normal(TestEventReason::ResourceCreated, "Created 2"),
)
.await;
let events = recorder.events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].resource_name, "resource-1");
assert_eq!(events[1].resource_name, "resource-2");
}
#[tokio::test]
async fn test_event_count_returns_unique_count() {
let recorder = MockEventRecorder::<TestEventReason>::new();
let resource = create_test_resource();
recorder
.emit(&resource, EventData::normal(TestEventReason::ResourceCreated, "First"))
.await;
recorder
.emit(&resource, EventData::normal(TestEventReason::ResourceCreated, "Second"))
.await;
recorder
.emit(
&resource,
EventData::normal(TestEventReason::ResourceUpdated, "Updated"),
)
.await;
assert_eq!(recorder.event_count(), 2);
}
}