1use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75 pub instance_id: String,
77 pub component_type: ComponentType,
79 pub component_id: String,
81}
82
83impl ComponentLogKey {
84 pub fn new(
86 instance_id: impl Into<String>,
87 component_type: ComponentType,
88 component_id: impl Into<String>,
89 ) -> Self {
90 Self {
91 instance_id: instance_id.into(),
92 component_type,
93 component_id: component_id.into(),
94 }
95 }
96
97 pub fn from_str_key(key: &str) -> Option<Self> {
100 let parts: Vec<&str> = key.split(':').collect();
101 match parts.len() {
102 1 => None, 3 => {
104 let component_type = match parts[1].to_lowercase().as_str() {
105 "source" => ComponentType::Source,
106 "query" => ComponentType::Query,
107 "reaction" => ComponentType::Reaction,
108 _ => return None,
109 };
110 Some(Self {
111 instance_id: parts[0].to_string(),
112 component_type,
113 component_id: parts[2].to_string(),
114 })
115 }
116 _ => None,
117 }
118 }
119
120 pub fn to_string_key(&self) -> String {
122 let type_str = match self.component_type {
123 ComponentType::Source => "source",
124 ComponentType::Query => "query",
125 ComponentType::Reaction => "reaction",
126 };
127 format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
128 }
129}
130
131impl std::fmt::Display for ComponentLogKey {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(f, "{}", self.to_string_key())
134 }
135}
136
137pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
145pub enum LogLevel {
146 Trace,
148 Debug,
150 Info,
152 Warn,
154 Error,
156}
157
158impl std::fmt::Display for LogLevel {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 LogLevel::Trace => write!(f, "TRACE"),
162 LogLevel::Debug => write!(f, "DEBUG"),
163 LogLevel::Info => write!(f, "INFO"),
164 LogLevel::Warn => write!(f, "WARN"),
165 LogLevel::Error => write!(f, "ERROR"),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct LogMessage {
176 pub timestamp: DateTime<Utc>,
178 pub level: LogLevel,
180 pub message: String,
182 pub instance_id: String,
184 pub component_id: String,
186 pub component_type: ComponentType,
188}
189
190impl LogMessage {
191 pub fn new(
193 level: LogLevel,
194 message: impl Into<String>,
195 component_id: impl Into<String>,
196 component_type: ComponentType,
197 ) -> Self {
198 Self::with_instance(level, message, "", component_id, component_type)
199 }
200
201 pub fn with_instance(
203 level: LogLevel,
204 message: impl Into<String>,
205 instance_id: impl Into<String>,
206 component_id: impl Into<String>,
207 component_type: ComponentType,
208 ) -> Self {
209 Self {
210 timestamp: Utc::now(),
211 level,
212 message: message.into(),
213 instance_id: instance_id.into(),
214 component_id: component_id.into(),
215 component_type,
216 }
217 }
218
219 pub fn key(&self) -> ComponentLogKey {
221 ComponentLogKey::new(
222 self.instance_id.clone(),
223 self.component_type.clone(),
224 self.component_id.clone(),
225 )
226 }
227}
228
229struct ComponentLogChannel {
231 history: VecDeque<LogMessage>,
233 max_history: usize,
235 sender: broadcast::Sender<LogMessage>,
237}
238
239impl ComponentLogChannel {
240 fn new(max_history: usize, channel_capacity: usize) -> Self {
241 let (sender, _) = broadcast::channel(channel_capacity);
242 Self {
243 history: VecDeque::with_capacity(max_history),
244 max_history,
245 sender,
246 }
247 }
248
249 fn log(&mut self, message: LogMessage) {
250 if self.history.len() >= self.max_history {
252 self.history.pop_front();
253 }
254 self.history.push_back(message.clone());
255
256 let _ = self.sender.send(message);
258 }
259
260 fn get_history(&self) -> Vec<LogMessage> {
261 self.history.iter().cloned().collect()
262 }
263
264 fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
265 self.sender.subscribe()
266 }
267}
268
269pub struct ComponentLogRegistry {
274 channels: RwLock<HashMap<String, ComponentLogChannel>>,
276 max_history: usize,
278 channel_capacity: usize,
280}
281
282impl std::fmt::Debug for ComponentLogRegistry {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("ComponentLogRegistry")
285 .field("max_history", &self.max_history)
286 .field("channel_capacity", &self.channel_capacity)
287 .finish()
288 }
289}
290
291impl Default for ComponentLogRegistry {
292 fn default() -> Self {
293 Self::new()
294 }
295}
296
297impl ComponentLogRegistry {
298 pub fn new() -> Self {
300 Self {
301 channels: RwLock::new(HashMap::new()),
302 max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
303 channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
304 }
305 }
306
307 pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
309 Self {
310 channels: RwLock::new(HashMap::new()),
311 max_history,
312 channel_capacity,
313 }
314 }
315
316 pub async fn log(&self, message: LogMessage) {
321 let key = message.key().to_string_key();
322 let mut channels = self.channels.write().await;
323 let channel = channels
324 .entry(key)
325 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
326 channel.log(message);
327 }
328
329 pub fn try_log(&self, message: LogMessage) -> bool {
335 match self.channels.try_write() {
336 Ok(mut channels) => {
337 let key = message.key().to_string_key();
338 let channel = channels.entry(key).or_insert_with(|| {
339 ComponentLogChannel::new(self.max_history, self.channel_capacity)
340 });
341 channel.log(message);
342 true
343 }
344 Err(_) => false,
345 }
346 }
347
348 pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
352 let channels = self.channels.read().await;
353 channels
354 .get(&key.to_string_key())
355 .map(|c| c.get_history())
356 .unwrap_or_default()
357 }
358
359 #[deprecated(note = "Use get_history_by_key with ComponentLogKey for instance isolation")]
363 pub async fn get_history(&self, component_id: &str) -> Vec<LogMessage> {
364 let channels = self.channels.read().await;
365 channels
366 .get(component_id)
367 .map(|c| c.get_history())
368 .unwrap_or_default()
369 }
370
371 pub async fn subscribe_by_key(
376 &self,
377 key: &ComponentLogKey,
378 ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
379 let mut channels = self.channels.write().await;
380 let channel = channels
381 .entry(key.to_string_key())
382 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
383
384 let history = channel.get_history();
385 let receiver = channel.subscribe();
386 (history, receiver)
387 }
388
389 #[deprecated(note = "Use subscribe_by_key with ComponentLogKey for instance isolation")]
394 pub async fn subscribe(
395 &self,
396 component_id: &str,
397 ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
398 let mut channels = self.channels.write().await;
399 let channel = channels
400 .entry(component_id.to_string())
401 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
402
403 let history = channel.get_history();
404 let receiver = channel.subscribe();
405 (history, receiver)
406 }
407
408 pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
412 self.channels.write().await.remove(&key.to_string_key());
413 }
414
415 #[deprecated(note = "Use remove_component_by_key with ComponentLogKey for instance isolation")]
419 pub async fn remove_component(&self, component_id: &str) {
420 self.channels.write().await.remove(component_id);
421 }
422
423 pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
425 self.channels
426 .read()
427 .await
428 .get(&key.to_string_key())
429 .map(|c| c.history.len())
430 .unwrap_or(0)
431 }
432
433 #[deprecated(note = "Use log_count_by_key with ComponentLogKey for instance isolation")]
435 pub async fn log_count(&self, component_id: &str) -> usize {
436 self.channels
437 .read()
438 .await
439 .get(component_id)
440 .map(|c| c.history.len())
441 .unwrap_or(0)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use tokio::time::{sleep, Duration};
449
450 fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
451 ComponentLogKey::new(instance, component_type, component)
452 }
453
454 #[tokio::test]
455 async fn test_log_and_get_history() {
456 let registry = ComponentLogRegistry::new();
457
458 let msg1 = LogMessage::with_instance(
459 LogLevel::Info,
460 "First message",
461 "instance1",
462 "source1",
463 ComponentType::Source,
464 );
465 let msg2 = LogMessage::with_instance(
466 LogLevel::Error,
467 "Second message",
468 "instance1",
469 "source1",
470 ComponentType::Source,
471 );
472
473 registry.log(msg1).await;
474 registry.log(msg2).await;
475
476 let key = make_key("instance1", ComponentType::Source, "source1");
477 let history = registry.get_history_by_key(&key).await;
478 assert_eq!(history.len(), 2);
479 assert_eq!(history[0].message, "First message");
480 assert_eq!(history[1].message, "Second message");
481 assert_eq!(history[1].level, LogLevel::Error);
482 }
483
484 #[tokio::test]
485 async fn test_max_history_limit() {
486 let registry = ComponentLogRegistry::with_capacity(3, 10);
487
488 for i in 0..5 {
489 let msg = LogMessage::with_instance(
490 LogLevel::Info,
491 format!("Message {i}"),
492 "instance1",
493 "source1",
494 ComponentType::Source,
495 );
496 registry.log(msg).await;
497 }
498
499 let key = make_key("instance1", ComponentType::Source, "source1");
500 let history = registry.get_history_by_key(&key).await;
501 assert_eq!(history.len(), 3);
502 assert_eq!(history[0].message, "Message 2");
504 assert_eq!(history[2].message, "Message 4");
505 }
506
507 #[tokio::test]
508 async fn test_subscribe_gets_history_and_live() {
509 let registry = Arc::new(ComponentLogRegistry::new());
510
511 let msg1 = LogMessage::with_instance(
513 LogLevel::Info,
514 "History 1",
515 "instance1",
516 "source1",
517 ComponentType::Source,
518 );
519 registry.log(msg1).await;
520
521 let key = make_key("instance1", ComponentType::Source, "source1");
523 let (history, mut receiver) = registry.subscribe_by_key(&key).await;
524 assert_eq!(history.len(), 1);
525 assert_eq!(history[0].message, "History 1");
526
527 let registry_clone = registry.clone();
529 tokio::spawn(async move {
530 sleep(Duration::from_millis(10)).await;
531 let msg2 = LogMessage::with_instance(
532 LogLevel::Info,
533 "Live message",
534 "instance1",
535 "source1",
536 ComponentType::Source,
537 );
538 registry_clone.log(msg2).await;
539 });
540
541 let live_msg = receiver.recv().await.unwrap();
543 assert_eq!(live_msg.message, "Live message");
544 }
545
546 #[tokio::test]
547 async fn test_remove_component() {
548 let registry = ComponentLogRegistry::new();
549
550 let msg = LogMessage::with_instance(
551 LogLevel::Info,
552 "Test",
553 "instance1",
554 "source1",
555 ComponentType::Source,
556 );
557 registry.log(msg).await;
558
559 let key = make_key("instance1", ComponentType::Source, "source1");
560 assert_eq!(registry.log_count_by_key(&key).await, 1);
561
562 registry.remove_component_by_key(&key).await;
563
564 assert_eq!(registry.log_count_by_key(&key).await, 0);
565 }
566
567 #[tokio::test]
568 async fn test_multiple_components() {
569 let registry = ComponentLogRegistry::new();
570
571 let msg1 = LogMessage::with_instance(
572 LogLevel::Info,
573 "Source log",
574 "instance1",
575 "source1",
576 ComponentType::Source,
577 );
578 let msg2 = LogMessage::with_instance(
579 LogLevel::Info,
580 "Query log",
581 "instance1",
582 "query1",
583 ComponentType::Query,
584 );
585
586 registry.log(msg1).await;
587 registry.log(msg2).await;
588
589 let source_key = make_key("instance1", ComponentType::Source, "source1");
590 let query_key = make_key("instance1", ComponentType::Query, "query1");
591
592 let source_history = registry.get_history_by_key(&source_key).await;
593 let query_history = registry.get_history_by_key(&query_key).await;
594
595 assert_eq!(source_history.len(), 1);
596 assert_eq!(query_history.len(), 1);
597 assert_eq!(source_history[0].component_type, ComponentType::Source);
598 assert_eq!(query_history[0].component_type, ComponentType::Query);
599 }
600
601 #[tokio::test]
602 async fn test_instance_isolation() {
603 let registry = ComponentLogRegistry::new();
605
606 let msg1 = LogMessage::with_instance(
608 LogLevel::Info,
609 "Instance 1 log",
610 "instance1",
611 "my-source",
612 ComponentType::Source,
613 );
614 let msg2 = LogMessage::with_instance(
615 LogLevel::Info,
616 "Instance 2 log",
617 "instance2",
618 "my-source",
619 ComponentType::Source,
620 );
621
622 registry.log(msg1).await;
623 registry.log(msg2).await;
624
625 let key1 = make_key("instance1", ComponentType::Source, "my-source");
626 let key2 = make_key("instance2", ComponentType::Source, "my-source");
627
628 let history1 = registry.get_history_by_key(&key1).await;
629 let history2 = registry.get_history_by_key(&key2).await;
630
631 assert_eq!(history1.len(), 1);
633 assert_eq!(history2.len(), 1);
634 assert_eq!(history1[0].message, "Instance 1 log");
635 assert_eq!(history2[0].message, "Instance 2 log");
636 }
637
638 #[test]
639 fn test_component_log_key() {
640 let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
641 assert_eq!(key.to_string_key(), "my-instance:source:my-source");
642 assert_eq!(key.instance_id, "my-instance");
643 assert_eq!(key.component_type, ComponentType::Source);
644 assert_eq!(key.component_id, "my-source");
645 }
646
647 #[test]
648 fn test_log_level_ordering() {
649 assert!(LogLevel::Trace < LogLevel::Debug);
650 assert!(LogLevel::Debug < LogLevel::Info);
651 assert!(LogLevel::Info < LogLevel::Warn);
652 assert!(LogLevel::Warn < LogLevel::Error);
653 }
654
655 #[test]
656 fn test_log_level_display() {
657 assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
658 assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
659 assert_eq!(format!("{}", LogLevel::Info), "INFO");
660 assert_eq!(format!("{}", LogLevel::Warn), "WARN");
661 assert_eq!(format!("{}", LogLevel::Error), "ERROR");
662 }
663}