1use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75 pub instance_id: String,
77 pub component_type: ComponentType,
79 pub component_id: String,
81}
82
83impl ComponentLogKey {
84 pub fn new(
86 instance_id: impl Into<String>,
87 component_type: ComponentType,
88 component_id: impl Into<String>,
89 ) -> Self {
90 Self {
91 instance_id: instance_id.into(),
92 component_type,
93 component_id: component_id.into(),
94 }
95 }
96
97 pub fn from_str_key(key: &str) -> Option<Self> {
100 let parts: Vec<&str> = key.split(':').collect();
101 match parts.len() {
102 1 => None, 3 => {
104 let component_type = match parts[1].to_lowercase().as_str() {
105 "source" => ComponentType::Source,
106 "query" => ComponentType::Query,
107 "reaction" => ComponentType::Reaction,
108 _ => return None,
109 };
110 Some(Self {
111 instance_id: parts[0].to_string(),
112 component_type,
113 component_id: parts[2].to_string(),
114 })
115 }
116 _ => None,
117 }
118 }
119
120 pub fn to_string_key(&self) -> String {
122 let type_str = match self.component_type {
123 ComponentType::Source => "source",
124 ComponentType::Query => "query",
125 ComponentType::Reaction => "reaction",
126 };
127 format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
128 }
129}
130
131impl std::fmt::Display for ComponentLogKey {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(f, "{}", self.to_string_key())
134 }
135}
136
137pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
145pub enum LogLevel {
146 Trace,
148 Debug,
150 Info,
152 Warn,
154 Error,
156}
157
158impl std::fmt::Display for LogLevel {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 LogLevel::Trace => write!(f, "TRACE"),
162 LogLevel::Debug => write!(f, "DEBUG"),
163 LogLevel::Info => write!(f, "INFO"),
164 LogLevel::Warn => write!(f, "WARN"),
165 LogLevel::Error => write!(f, "ERROR"),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct LogMessage {
176 pub timestamp: DateTime<Utc>,
178 pub level: LogLevel,
180 pub message: String,
182 pub instance_id: String,
184 pub component_id: String,
186 pub component_type: ComponentType,
188}
189
190impl LogMessage {
191 pub fn new(
193 level: LogLevel,
194 message: impl Into<String>,
195 component_id: impl Into<String>,
196 component_type: ComponentType,
197 ) -> Self {
198 Self::with_instance(level, message, "", component_id, component_type)
199 }
200
201 pub fn with_instance(
203 level: LogLevel,
204 message: impl Into<String>,
205 instance_id: impl Into<String>,
206 component_id: impl Into<String>,
207 component_type: ComponentType,
208 ) -> Self {
209 Self {
210 timestamp: Utc::now(),
211 level,
212 message: message.into(),
213 instance_id: instance_id.into(),
214 component_id: component_id.into(),
215 component_type,
216 }
217 }
218
219 pub fn key(&self) -> ComponentLogKey {
221 ComponentLogKey::new(
222 self.instance_id.clone(),
223 self.component_type.clone(),
224 self.component_id.clone(),
225 )
226 }
227}
228
229struct ComponentLogChannel {
231 history: VecDeque<LogMessage>,
233 max_history: usize,
235 sender: broadcast::Sender<LogMessage>,
237}
238
239impl ComponentLogChannel {
240 fn new(max_history: usize, channel_capacity: usize) -> Self {
241 let (sender, _) = broadcast::channel(channel_capacity);
242 Self {
243 history: VecDeque::with_capacity(max_history),
244 max_history,
245 sender,
246 }
247 }
248
249 fn log(&mut self, message: LogMessage) {
250 if self.history.len() >= self.max_history {
252 self.history.pop_front();
253 }
254 self.history.push_back(message.clone());
255
256 let _ = self.sender.send(message);
258 }
259
260 fn get_history(&self) -> Vec<LogMessage> {
261 self.history.iter().cloned().collect()
262 }
263
264 fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
265 self.sender.subscribe()
266 }
267}
268
269pub struct ComponentLogRegistry {
274 channels: RwLock<HashMap<String, ComponentLogChannel>>,
276 max_history: usize,
278 channel_capacity: usize,
280}
281
282impl std::fmt::Debug for ComponentLogRegistry {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("ComponentLogRegistry")
285 .field("max_history", &self.max_history)
286 .field("channel_capacity", &self.channel_capacity)
287 .finish()
288 }
289}
290
291impl Default for ComponentLogRegistry {
292 fn default() -> Self {
293 Self::new()
294 }
295}
296
297impl ComponentLogRegistry {
298 pub fn new() -> Self {
300 Self {
301 channels: RwLock::new(HashMap::new()),
302 max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
303 channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
304 }
305 }
306
307 pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
309 Self {
310 channels: RwLock::new(HashMap::new()),
311 max_history,
312 channel_capacity,
313 }
314 }
315
316 pub async fn log(&self, message: LogMessage) {
321 let key = message.key().to_string_key();
322 let mut channels = self.channels.write().await;
323 let channel = channels
324 .entry(key)
325 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
326 channel.log(message);
327 }
328
329 pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
333 let channels = self.channels.read().await;
334 channels
335 .get(&key.to_string_key())
336 .map(|c| c.get_history())
337 .unwrap_or_default()
338 }
339
340 #[deprecated(note = "Use get_history_by_key with ComponentLogKey for instance isolation")]
344 pub async fn get_history(&self, component_id: &str) -> Vec<LogMessage> {
345 let channels = self.channels.read().await;
346 channels
347 .get(component_id)
348 .map(|c| c.get_history())
349 .unwrap_or_default()
350 }
351
352 pub async fn subscribe_by_key(
357 &self,
358 key: &ComponentLogKey,
359 ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
360 let mut channels = self.channels.write().await;
361 let channel = channels
362 .entry(key.to_string_key())
363 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
364
365 let history = channel.get_history();
366 let receiver = channel.subscribe();
367 (history, receiver)
368 }
369
370 #[deprecated(note = "Use subscribe_by_key with ComponentLogKey for instance isolation")]
375 pub async fn subscribe(
376 &self,
377 component_id: &str,
378 ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
379 let mut channels = self.channels.write().await;
380 let channel = channels
381 .entry(component_id.to_string())
382 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
383
384 let history = channel.get_history();
385 let receiver = channel.subscribe();
386 (history, receiver)
387 }
388
389 pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
393 self.channels.write().await.remove(&key.to_string_key());
394 }
395
396 #[deprecated(note = "Use remove_component_by_key with ComponentLogKey for instance isolation")]
400 pub async fn remove_component(&self, component_id: &str) {
401 self.channels.write().await.remove(component_id);
402 }
403
404 pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
406 self.channels
407 .read()
408 .await
409 .get(&key.to_string_key())
410 .map(|c| c.history.len())
411 .unwrap_or(0)
412 }
413
414 #[deprecated(note = "Use log_count_by_key with ComponentLogKey for instance isolation")]
416 pub async fn log_count(&self, component_id: &str) -> usize {
417 self.channels
418 .read()
419 .await
420 .get(component_id)
421 .map(|c| c.history.len())
422 .unwrap_or(0)
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use tokio::time::{sleep, Duration};
430
431 fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
432 ComponentLogKey::new(instance, component_type, component)
433 }
434
435 #[tokio::test]
436 async fn test_log_and_get_history() {
437 let registry = ComponentLogRegistry::new();
438
439 let msg1 = LogMessage::with_instance(
440 LogLevel::Info,
441 "First message",
442 "instance1",
443 "source1",
444 ComponentType::Source,
445 );
446 let msg2 = LogMessage::with_instance(
447 LogLevel::Error,
448 "Second message",
449 "instance1",
450 "source1",
451 ComponentType::Source,
452 );
453
454 registry.log(msg1).await;
455 registry.log(msg2).await;
456
457 let key = make_key("instance1", ComponentType::Source, "source1");
458 let history = registry.get_history_by_key(&key).await;
459 assert_eq!(history.len(), 2);
460 assert_eq!(history[0].message, "First message");
461 assert_eq!(history[1].message, "Second message");
462 assert_eq!(history[1].level, LogLevel::Error);
463 }
464
465 #[tokio::test]
466 async fn test_max_history_limit() {
467 let registry = ComponentLogRegistry::with_capacity(3, 10);
468
469 for i in 0..5 {
470 let msg = LogMessage::with_instance(
471 LogLevel::Info,
472 format!("Message {i}"),
473 "instance1",
474 "source1",
475 ComponentType::Source,
476 );
477 registry.log(msg).await;
478 }
479
480 let key = make_key("instance1", ComponentType::Source, "source1");
481 let history = registry.get_history_by_key(&key).await;
482 assert_eq!(history.len(), 3);
483 assert_eq!(history[0].message, "Message 2");
485 assert_eq!(history[2].message, "Message 4");
486 }
487
488 #[tokio::test]
489 async fn test_subscribe_gets_history_and_live() {
490 let registry = Arc::new(ComponentLogRegistry::new());
491
492 let msg1 = LogMessage::with_instance(
494 LogLevel::Info,
495 "History 1",
496 "instance1",
497 "source1",
498 ComponentType::Source,
499 );
500 registry.log(msg1).await;
501
502 let key = make_key("instance1", ComponentType::Source, "source1");
504 let (history, mut receiver) = registry.subscribe_by_key(&key).await;
505 assert_eq!(history.len(), 1);
506 assert_eq!(history[0].message, "History 1");
507
508 let registry_clone = registry.clone();
510 tokio::spawn(async move {
511 sleep(Duration::from_millis(10)).await;
512 let msg2 = LogMessage::with_instance(
513 LogLevel::Info,
514 "Live message",
515 "instance1",
516 "source1",
517 ComponentType::Source,
518 );
519 registry_clone.log(msg2).await;
520 });
521
522 let live_msg = receiver.recv().await.unwrap();
524 assert_eq!(live_msg.message, "Live message");
525 }
526
527 #[tokio::test]
528 async fn test_remove_component() {
529 let registry = ComponentLogRegistry::new();
530
531 let msg = LogMessage::with_instance(
532 LogLevel::Info,
533 "Test",
534 "instance1",
535 "source1",
536 ComponentType::Source,
537 );
538 registry.log(msg).await;
539
540 let key = make_key("instance1", ComponentType::Source, "source1");
541 assert_eq!(registry.log_count_by_key(&key).await, 1);
542
543 registry.remove_component_by_key(&key).await;
544
545 assert_eq!(registry.log_count_by_key(&key).await, 0);
546 }
547
548 #[tokio::test]
549 async fn test_multiple_components() {
550 let registry = ComponentLogRegistry::new();
551
552 let msg1 = LogMessage::with_instance(
553 LogLevel::Info,
554 "Source log",
555 "instance1",
556 "source1",
557 ComponentType::Source,
558 );
559 let msg2 = LogMessage::with_instance(
560 LogLevel::Info,
561 "Query log",
562 "instance1",
563 "query1",
564 ComponentType::Query,
565 );
566
567 registry.log(msg1).await;
568 registry.log(msg2).await;
569
570 let source_key = make_key("instance1", ComponentType::Source, "source1");
571 let query_key = make_key("instance1", ComponentType::Query, "query1");
572
573 let source_history = registry.get_history_by_key(&source_key).await;
574 let query_history = registry.get_history_by_key(&query_key).await;
575
576 assert_eq!(source_history.len(), 1);
577 assert_eq!(query_history.len(), 1);
578 assert_eq!(source_history[0].component_type, ComponentType::Source);
579 assert_eq!(query_history[0].component_type, ComponentType::Query);
580 }
581
582 #[tokio::test]
583 async fn test_instance_isolation() {
584 let registry = ComponentLogRegistry::new();
586
587 let msg1 = LogMessage::with_instance(
589 LogLevel::Info,
590 "Instance 1 log",
591 "instance1",
592 "my-source",
593 ComponentType::Source,
594 );
595 let msg2 = LogMessage::with_instance(
596 LogLevel::Info,
597 "Instance 2 log",
598 "instance2",
599 "my-source",
600 ComponentType::Source,
601 );
602
603 registry.log(msg1).await;
604 registry.log(msg2).await;
605
606 let key1 = make_key("instance1", ComponentType::Source, "my-source");
607 let key2 = make_key("instance2", ComponentType::Source, "my-source");
608
609 let history1 = registry.get_history_by_key(&key1).await;
610 let history2 = registry.get_history_by_key(&key2).await;
611
612 assert_eq!(history1.len(), 1);
614 assert_eq!(history2.len(), 1);
615 assert_eq!(history1[0].message, "Instance 1 log");
616 assert_eq!(history2[0].message, "Instance 2 log");
617 }
618
619 #[test]
620 fn test_component_log_key() {
621 let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
622 assert_eq!(key.to_string_key(), "my-instance:source:my-source");
623 assert_eq!(key.instance_id, "my-instance");
624 assert_eq!(key.component_type, ComponentType::Source);
625 assert_eq!(key.component_id, "my-source");
626 }
627
628 #[test]
629 fn test_log_level_ordering() {
630 assert!(LogLevel::Trace < LogLevel::Debug);
631 assert!(LogLevel::Debug < LogLevel::Info);
632 assert!(LogLevel::Info < LogLevel::Warn);
633 assert!(LogLevel::Warn < LogLevel::Error);
634 }
635
636 #[test]
637 fn test_log_level_display() {
638 assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
639 assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
640 assert_eq!(format!("{}", LogLevel::Info), "INFO");
641 assert_eq!(format!("{}", LogLevel::Warn), "WARN");
642 assert_eq!(format!("{}", LogLevel::Error), "ERROR");
643 }
644}