use std::collections::{HashMap, VecDeque};
use tokio::sync::broadcast;
use crate::channels::{ComponentEvent, ComponentStatus};
pub const DEFAULT_MAX_EVENTS_PER_COMPONENT: usize = 100;
pub const DEFAULT_EVENT_CHANNEL_CAPACITY: usize = 256;
struct ComponentEventChannel {
history: VecDeque<ComponentEvent>,
max_history: usize,
sender: broadcast::Sender<ComponentEvent>,
}
impl ComponentEventChannel {
fn new(max_history: usize, channel_capacity: usize) -> Self {
let (sender, _) = broadcast::channel(channel_capacity);
Self {
history: VecDeque::with_capacity(max_history),
max_history,
sender,
}
}
fn record(&mut self, event: ComponentEvent) {
if self.history.len() >= self.max_history {
self.history.pop_front();
}
self.history.push_back(event.clone());
let _ = self.sender.send(event);
}
fn get_history(&self) -> Vec<ComponentEvent> {
self.history.iter().cloned().collect()
}
fn subscribe(&self) -> broadcast::Receiver<ComponentEvent> {
self.sender.subscribe()
}
fn get_last_error(&self) -> Option<String> {
self.history
.iter()
.rev()
.find(|event| event.status == ComponentStatus::Error)
.and_then(|event| event.message.clone())
}
}
pub struct ComponentEventHistory {
channels: HashMap<String, ComponentEventChannel>,
max_events_per_component: usize,
channel_capacity: usize,
}
impl std::fmt::Debug for ComponentEventHistory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentEventHistory")
.field("max_events_per_component", &self.max_events_per_component)
.field("channel_capacity", &self.channel_capacity)
.field("component_count", &self.channels.len())
.finish()
}
}
impl Default for ComponentEventHistory {
fn default() -> Self {
Self::new()
}
}
impl ComponentEventHistory {
pub fn new() -> Self {
Self {
channels: HashMap::new(),
max_events_per_component: DEFAULT_MAX_EVENTS_PER_COMPONENT,
channel_capacity: DEFAULT_EVENT_CHANNEL_CAPACITY,
}
}
pub fn with_capacity(max_events_per_component: usize, channel_capacity: usize) -> Self {
Self {
channels: HashMap::new(),
max_events_per_component,
channel_capacity,
}
}
pub fn record_event(&mut self, event: ComponentEvent) {
let component_id = event.component_id.clone();
let channel = self.channels.entry(component_id).or_insert_with(|| {
ComponentEventChannel::new(self.max_events_per_component, self.channel_capacity)
});
channel.record(event);
}
pub fn get_events(&self, component_id: &str) -> Vec<ComponentEvent> {
self.channels
.get(component_id)
.map(|channel| channel.get_history())
.unwrap_or_default()
}
pub fn subscribe(
&mut self,
component_id: &str,
) -> (Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>) {
let channel = self
.channels
.entry(component_id.to_string())
.or_insert_with(|| {
ComponentEventChannel::new(self.max_events_per_component, self.channel_capacity)
});
let history = channel.get_history();
let receiver = channel.subscribe();
(history, receiver)
}
pub fn try_subscribe(
&self,
component_id: &str,
) -> Option<(Vec<ComponentEvent>, broadcast::Receiver<ComponentEvent>)> {
let channel = self.channels.get(component_id)?;
Some((channel.get_history(), channel.subscribe()))
}
pub fn get_last_error(&self, component_id: &str) -> Option<String> {
self.channels
.get(component_id)
.and_then(|channel| channel.get_last_error())
}
pub fn get_all_events(&self) -> Vec<ComponentEvent> {
let mut all_events: Vec<ComponentEvent> = self
.channels
.values()
.flat_map(|channel| channel.get_history())
.collect();
all_events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
all_events
}
pub fn remove_component(&mut self, component_id: &str) {
self.channels.remove(component_id);
}
pub fn event_count(&self, component_id: &str) -> usize {
self.channels
.get(component_id)
.map(|channel| channel.history.len())
.unwrap_or(0)
}
pub fn total_event_count(&self) -> usize {
self.channels
.values()
.map(|channel| channel.history.len())
.sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::channels::ComponentType;
use chrono::Utc;
fn create_test_event(
component_id: &str,
status: ComponentStatus,
message: Option<&str>,
) -> ComponentEvent {
ComponentEvent {
component_id: component_id.to_string(),
component_type: ComponentType::Source,
status,
timestamp: Utc::now(),
message: message.map(|s| s.to_string()),
}
}
#[test]
fn test_record_and_get_events() {
let mut history = ComponentEventHistory::new();
let event1 = create_test_event("source1", ComponentStatus::Starting, None);
let event2 = create_test_event("source1", ComponentStatus::Running, None);
history.record_event(event1);
history.record_event(event2);
let events = history.get_events("source1");
assert_eq!(events.len(), 2);
assert_eq!(events[0].status, ComponentStatus::Starting);
assert_eq!(events[1].status, ComponentStatus::Running);
}
#[test]
fn test_max_events_per_component() {
let mut history = ComponentEventHistory::with_capacity(3, 10);
for i in 0..5 {
let event = create_test_event(
"source1",
ComponentStatus::Running,
Some(&format!("event {i}")),
);
history.record_event(event);
}
let events = history.get_events("source1");
assert_eq!(events.len(), 3);
assert_eq!(events[0].message, Some("event 2".to_string()));
assert_eq!(events[2].message, Some("event 4".to_string()));
}
#[test]
fn test_get_last_error() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event(
"source1",
ComponentStatus::Error,
Some("First error"),
));
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event(
"source1",
ComponentStatus::Error,
Some("Second error"),
));
history.record_event(create_test_event("source1", ComponentStatus::Running, None));
let last_error = history.get_last_error("source1");
assert_eq!(last_error, Some("Second error".to_string()));
}
#[test]
fn test_get_last_error_none() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event("source1", ComponentStatus::Running, None));
let last_error = history.get_last_error("source1");
assert!(last_error.is_none());
}
#[test]
fn test_get_all_events() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event("query1", ComponentStatus::Starting, None));
history.record_event(create_test_event("source1", ComponentStatus::Running, None));
let all_events = history.get_all_events();
assert_eq!(all_events.len(), 3);
}
#[test]
fn test_remove_component() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event(
"source2",
ComponentStatus::Starting,
None,
));
history.remove_component("source1");
assert_eq!(history.get_events("source1").len(), 0);
assert_eq!(history.get_events("source2").len(), 1);
}
#[test]
fn test_event_count() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event("source1", ComponentStatus::Running, None));
history.record_event(create_test_event(
"source2",
ComponentStatus::Starting,
None,
));
assert_eq!(history.event_count("source1"), 2);
assert_eq!(history.event_count("source2"), 1);
assert_eq!(history.event_count("nonexistent"), 0);
assert_eq!(history.total_event_count(), 3);
}
#[test]
fn test_subscribe_gets_history() {
let mut history = ComponentEventHistory::new();
history.record_event(create_test_event(
"source1",
ComponentStatus::Starting,
None,
));
history.record_event(create_test_event("source1", ComponentStatus::Running, None));
let (events, _receiver) = history.subscribe("source1");
assert_eq!(events.len(), 2);
assert_eq!(events[0].status, ComponentStatus::Starting);
assert_eq!(events[1].status, ComponentStatus::Running);
}
#[tokio::test]
async fn test_subscribe_receives_live_events() {
let mut history = ComponentEventHistory::new();
let (_history, mut receiver) = history.subscribe("source1");
let event = create_test_event("source1", ComponentStatus::Running, Some("live event"));
history.record_event(event);
let received = receiver.try_recv().unwrap();
assert_eq!(received.status, ComponentStatus::Running);
assert_eq!(received.message, Some("live event".to_string()));
}
}