use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock};
use crate::channels::ComponentType;
pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ComponentLogKey {
pub instance_id: String,
pub component_type: ComponentType,
pub component_id: String,
}
impl ComponentLogKey {
pub fn new(
instance_id: impl Into<String>,
component_type: ComponentType,
component_id: impl Into<String>,
) -> Self {
Self {
instance_id: instance_id.into(),
component_type,
component_id: component_id.into(),
}
}
pub fn from_str_key(key: &str) -> Option<Self> {
let parts: Vec<&str> = key.split(':').collect();
match parts.len() {
1 => None, 3 => {
let component_type = match parts[1].to_lowercase().as_str() {
"source" => ComponentType::Source,
"query" => ComponentType::Query,
"reaction" => ComponentType::Reaction,
_ => return None,
};
Some(Self {
instance_id: parts[0].to_string(),
component_type,
component_id: parts[2].to_string(),
})
}
_ => None,
}
}
pub fn to_string_key(&self) -> String {
let type_str = match self.component_type {
ComponentType::Source => "source",
ComponentType::Query => "query",
ComponentType::Reaction => "reaction",
ComponentType::BootstrapProvider => "bootstrap_provider",
ComponentType::IdentityProvider => "identity_provider",
};
format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
}
}
impl std::fmt::Display for ComponentLogKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_string_key())
}
}
pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Trace => write!(f, "TRACE"),
LogLevel::Debug => write!(f, "DEBUG"),
LogLevel::Info => write!(f, "INFO"),
LogLevel::Warn => write!(f, "WARN"),
LogLevel::Error => write!(f, "ERROR"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogMessage {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub instance_id: String,
pub component_id: String,
pub component_type: ComponentType,
}
impl LogMessage {
pub fn new(
level: LogLevel,
message: impl Into<String>,
component_id: impl Into<String>,
component_type: ComponentType,
) -> Self {
Self::with_instance(level, message, "", component_id, component_type)
}
pub fn with_instance(
level: LogLevel,
message: impl Into<String>,
instance_id: impl Into<String>,
component_id: impl Into<String>,
component_type: ComponentType,
) -> Self {
Self {
timestamp: Utc::now(),
level,
message: message.into(),
instance_id: instance_id.into(),
component_id: component_id.into(),
component_type,
}
}
pub fn key(&self) -> ComponentLogKey {
ComponentLogKey::new(
self.instance_id.clone(),
self.component_type.clone(),
self.component_id.clone(),
)
}
}
struct ComponentLogChannel {
history: VecDeque<LogMessage>,
max_history: usize,
sender: broadcast::Sender<LogMessage>,
}
impl ComponentLogChannel {
fn new(max_history: usize, channel_capacity: usize) -> Self {
let (sender, _) = broadcast::channel(channel_capacity);
Self {
history: VecDeque::with_capacity(max_history),
max_history,
sender,
}
}
fn log(&mut self, message: LogMessage) {
if self.history.len() >= self.max_history {
self.history.pop_front();
}
self.history.push_back(message.clone());
let _ = self.sender.send(message);
}
fn get_history(&self) -> Vec<LogMessage> {
self.history.iter().cloned().collect()
}
fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
self.sender.subscribe()
}
}
pub struct ComponentLogRegistry {
channels: RwLock<HashMap<String, ComponentLogChannel>>,
max_history: usize,
channel_capacity: usize,
}
impl std::fmt::Debug for ComponentLogRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ComponentLogRegistry")
.field("max_history", &self.max_history)
.field("channel_capacity", &self.channel_capacity)
.finish()
}
}
impl Default for ComponentLogRegistry {
fn default() -> Self {
Self::new()
}
}
impl ComponentLogRegistry {
pub fn new() -> Self {
Self {
channels: RwLock::new(HashMap::new()),
max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
}
}
pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
Self {
channels: RwLock::new(HashMap::new()),
max_history,
channel_capacity,
}
}
pub async fn log(&self, message: LogMessage) {
let key = message.key().to_string_key();
let mut channels = self.channels.write().await;
let channel = channels
.entry(key)
.or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
channel.log(message);
}
pub fn try_log(&self, message: LogMessage) -> bool {
match self.channels.try_write() {
Ok(mut channels) => {
let key = message.key().to_string_key();
let channel = channels.entry(key).or_insert_with(|| {
ComponentLogChannel::new(self.max_history, self.channel_capacity)
});
channel.log(message);
true
}
Err(_) => false,
}
}
pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
let channels = self.channels.read().await;
channels
.get(&key.to_string_key())
.map(|c| c.get_history())
.unwrap_or_default()
}
pub async fn subscribe_by_key(
&self,
key: &ComponentLogKey,
) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
let mut channels = self.channels.write().await;
let channel = channels
.entry(key.to_string_key())
.or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
let history = channel.get_history();
let receiver = channel.subscribe();
(history, receiver)
}
pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
self.channels.write().await.remove(&key.to_string_key());
}
pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
self.channels
.read()
.await
.get(&key.to_string_key())
.map(|c| c.history.len())
.unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
ComponentLogKey::new(instance, component_type, component)
}
#[tokio::test]
async fn test_log_and_get_history() {
let registry = ComponentLogRegistry::new();
let msg1 = LogMessage::with_instance(
LogLevel::Info,
"First message",
"instance1",
"source1",
ComponentType::Source,
);
let msg2 = LogMessage::with_instance(
LogLevel::Error,
"Second message",
"instance1",
"source1",
ComponentType::Source,
);
registry.log(msg1).await;
registry.log(msg2).await;
let key = make_key("instance1", ComponentType::Source, "source1");
let history = registry.get_history_by_key(&key).await;
assert_eq!(history.len(), 2);
assert_eq!(history[0].message, "First message");
assert_eq!(history[1].message, "Second message");
assert_eq!(history[1].level, LogLevel::Error);
}
#[tokio::test]
async fn test_max_history_limit() {
let registry = ComponentLogRegistry::with_capacity(3, 10);
for i in 0..5 {
let msg = LogMessage::with_instance(
LogLevel::Info,
format!("Message {i}"),
"instance1",
"source1",
ComponentType::Source,
);
registry.log(msg).await;
}
let key = make_key("instance1", ComponentType::Source, "source1");
let history = registry.get_history_by_key(&key).await;
assert_eq!(history.len(), 3);
assert_eq!(history[0].message, "Message 2");
assert_eq!(history[2].message, "Message 4");
}
#[tokio::test]
async fn test_subscribe_gets_history_and_live() {
let registry = Arc::new(ComponentLogRegistry::new());
let msg1 = LogMessage::with_instance(
LogLevel::Info,
"History 1",
"instance1",
"source1",
ComponentType::Source,
);
registry.log(msg1).await;
let key = make_key("instance1", ComponentType::Source, "source1");
let (history, mut receiver) = registry.subscribe_by_key(&key).await;
assert_eq!(history.len(), 1);
assert_eq!(history[0].message, "History 1");
let registry_clone = registry.clone();
tokio::spawn(async move {
tokio::task::yield_now().await;
let msg2 = LogMessage::with_instance(
LogLevel::Info,
"Live message",
"instance1",
"source1",
ComponentType::Source,
);
registry_clone.log(msg2).await;
});
let live_msg = receiver.recv().await.unwrap();
assert_eq!(live_msg.message, "Live message");
}
#[tokio::test]
async fn test_remove_component() {
let registry = ComponentLogRegistry::new();
let msg = LogMessage::with_instance(
LogLevel::Info,
"Test",
"instance1",
"source1",
ComponentType::Source,
);
registry.log(msg).await;
let key = make_key("instance1", ComponentType::Source, "source1");
assert_eq!(registry.log_count_by_key(&key).await, 1);
registry.remove_component_by_key(&key).await;
assert_eq!(registry.log_count_by_key(&key).await, 0);
}
#[tokio::test]
async fn test_multiple_components() {
let registry = ComponentLogRegistry::new();
let msg1 = LogMessage::with_instance(
LogLevel::Info,
"Source log",
"instance1",
"source1",
ComponentType::Source,
);
let msg2 = LogMessage::with_instance(
LogLevel::Info,
"Query log",
"instance1",
"query1",
ComponentType::Query,
);
registry.log(msg1).await;
registry.log(msg2).await;
let source_key = make_key("instance1", ComponentType::Source, "source1");
let query_key = make_key("instance1", ComponentType::Query, "query1");
let source_history = registry.get_history_by_key(&source_key).await;
let query_history = registry.get_history_by_key(&query_key).await;
assert_eq!(source_history.len(), 1);
assert_eq!(query_history.len(), 1);
assert_eq!(source_history[0].component_type, ComponentType::Source);
assert_eq!(query_history[0].component_type, ComponentType::Query);
}
#[tokio::test]
async fn test_instance_isolation() {
let registry = ComponentLogRegistry::new();
let msg1 = LogMessage::with_instance(
LogLevel::Info,
"Instance 1 log",
"instance1",
"my-source",
ComponentType::Source,
);
let msg2 = LogMessage::with_instance(
LogLevel::Info,
"Instance 2 log",
"instance2",
"my-source",
ComponentType::Source,
);
registry.log(msg1).await;
registry.log(msg2).await;
let key1 = make_key("instance1", ComponentType::Source, "my-source");
let key2 = make_key("instance2", ComponentType::Source, "my-source");
let history1 = registry.get_history_by_key(&key1).await;
let history2 = registry.get_history_by_key(&key2).await;
assert_eq!(history1.len(), 1);
assert_eq!(history2.len(), 1);
assert_eq!(history1[0].message, "Instance 1 log");
assert_eq!(history2[0].message, "Instance 2 log");
}
#[test]
fn test_component_log_key() {
let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
assert_eq!(key.to_string_key(), "my-instance:source:my-source");
assert_eq!(key.instance_id, "my-instance");
assert_eq!(key.component_type, ComponentType::Source);
assert_eq!(key.component_id, "my-source");
}
#[test]
fn test_log_level_ordering() {
assert!(LogLevel::Trace < LogLevel::Debug);
assert!(LogLevel::Debug < LogLevel::Info);
assert!(LogLevel::Info < LogLevel::Warn);
assert!(LogLevel::Warn < LogLevel::Error);
}
#[test]
fn test_log_level_display() {
assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
assert_eq!(format!("{}", LogLevel::Info), "INFO");
assert_eq!(format!("{}", LogLevel::Warn), "WARN");
assert_eq!(format!("{}", LogLevel::Error), "ERROR");
}
}