1use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75 pub instance_id: String,
77 pub component_type: ComponentType,
79 pub component_id: String,
81}
82
83impl ComponentLogKey {
84 pub fn new(
86 instance_id: impl Into<String>,
87 component_type: ComponentType,
88 component_id: impl Into<String>,
89 ) -> Self {
90 Self {
91 instance_id: instance_id.into(),
92 component_type,
93 component_id: component_id.into(),
94 }
95 }
96
97 pub fn from_str_key(key: &str) -> Option<Self> {
100 let parts: Vec<&str> = key.split(':').collect();
101 match parts.len() {
102 1 => None, 3 => {
104 let component_type = match parts[1].to_lowercase().as_str() {
105 "source" => ComponentType::Source,
106 "query" => ComponentType::Query,
107 "reaction" => ComponentType::Reaction,
108 _ => return None,
109 };
110 Some(Self {
111 instance_id: parts[0].to_string(),
112 component_type,
113 component_id: parts[2].to_string(),
114 })
115 }
116 _ => None,
117 }
118 }
119
120 pub fn to_string_key(&self) -> String {
122 let type_str = match self.component_type {
123 ComponentType::Source => "source",
124 ComponentType::Query => "query",
125 ComponentType::Reaction => "reaction",
126 ComponentType::BootstrapProvider => "bootstrap_provider",
127 ComponentType::IdentityProvider => "identity_provider",
128 };
129 format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
130 }
131}
132
133impl std::fmt::Display for ComponentLogKey {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 write!(f, "{}", self.to_string_key())
136 }
137}
138
139pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum LogLevel {
148 Trace,
150 Debug,
152 Info,
154 Warn,
156 Error,
158}
159
160impl std::fmt::Display for LogLevel {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 LogLevel::Trace => write!(f, "TRACE"),
164 LogLevel::Debug => write!(f, "DEBUG"),
165 LogLevel::Info => write!(f, "INFO"),
166 LogLevel::Warn => write!(f, "WARN"),
167 LogLevel::Error => write!(f, "ERROR"),
168 }
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct LogMessage {
178 pub timestamp: DateTime<Utc>,
180 pub level: LogLevel,
182 pub message: String,
184 pub instance_id: String,
186 pub component_id: String,
188 pub component_type: ComponentType,
190}
191
192impl LogMessage {
193 pub fn new(
195 level: LogLevel,
196 message: impl Into<String>,
197 component_id: impl Into<String>,
198 component_type: ComponentType,
199 ) -> Self {
200 Self::with_instance(level, message, "", component_id, component_type)
201 }
202
203 pub fn with_instance(
205 level: LogLevel,
206 message: impl Into<String>,
207 instance_id: impl Into<String>,
208 component_id: impl Into<String>,
209 component_type: ComponentType,
210 ) -> Self {
211 Self {
212 timestamp: Utc::now(),
213 level,
214 message: message.into(),
215 instance_id: instance_id.into(),
216 component_id: component_id.into(),
217 component_type,
218 }
219 }
220
221 pub fn key(&self) -> ComponentLogKey {
223 ComponentLogKey::new(
224 self.instance_id.clone(),
225 self.component_type.clone(),
226 self.component_id.clone(),
227 )
228 }
229}
230
231struct ComponentLogChannel {
233 history: VecDeque<LogMessage>,
235 max_history: usize,
237 sender: broadcast::Sender<LogMessage>,
239}
240
241impl ComponentLogChannel {
242 fn new(max_history: usize, channel_capacity: usize) -> Self {
243 let (sender, _) = broadcast::channel(channel_capacity);
244 Self {
245 history: VecDeque::with_capacity(max_history),
246 max_history,
247 sender,
248 }
249 }
250
251 fn log(&mut self, message: LogMessage) {
252 if self.history.len() >= self.max_history {
254 self.history.pop_front();
255 }
256 self.history.push_back(message.clone());
257
258 let _ = self.sender.send(message);
260 }
261
262 fn get_history(&self) -> Vec<LogMessage> {
263 self.history.iter().cloned().collect()
264 }
265
266 fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
267 self.sender.subscribe()
268 }
269}
270
271pub struct ComponentLogRegistry {
276 channels: RwLock<HashMap<String, ComponentLogChannel>>,
278 max_history: usize,
280 channel_capacity: usize,
282}
283
284impl std::fmt::Debug for ComponentLogRegistry {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("ComponentLogRegistry")
287 .field("max_history", &self.max_history)
288 .field("channel_capacity", &self.channel_capacity)
289 .finish()
290 }
291}
292
293impl Default for ComponentLogRegistry {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299impl ComponentLogRegistry {
300 pub fn new() -> Self {
302 Self {
303 channels: RwLock::new(HashMap::new()),
304 max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
305 channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
306 }
307 }
308
309 pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
311 Self {
312 channels: RwLock::new(HashMap::new()),
313 max_history,
314 channel_capacity,
315 }
316 }
317
318 pub async fn log(&self, message: LogMessage) {
323 let key = message.key().to_string_key();
324 let mut channels = self.channels.write().await;
325 let channel = channels
326 .entry(key)
327 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
328 channel.log(message);
329 }
330
331 pub fn try_log(&self, message: LogMessage) -> bool {
337 match self.channels.try_write() {
338 Ok(mut channels) => {
339 let key = message.key().to_string_key();
340 let channel = channels.entry(key).or_insert_with(|| {
341 ComponentLogChannel::new(self.max_history, self.channel_capacity)
342 });
343 channel.log(message);
344 true
345 }
346 Err(_) => false,
347 }
348 }
349
350 pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
354 let channels = self.channels.read().await;
355 channels
356 .get(&key.to_string_key())
357 .map(|c| c.get_history())
358 .unwrap_or_default()
359 }
360
361 pub async fn subscribe_by_key(
366 &self,
367 key: &ComponentLogKey,
368 ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
369 let mut channels = self.channels.write().await;
370 let channel = channels
371 .entry(key.to_string_key())
372 .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
373
374 let history = channel.get_history();
375 let receiver = channel.subscribe();
376 (history, receiver)
377 }
378
379 pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
383 self.channels.write().await.remove(&key.to_string_key());
384 }
385
386 pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
388 self.channels
389 .read()
390 .await
391 .get(&key.to_string_key())
392 .map(|c| c.history.len())
393 .unwrap_or(0)
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
401 ComponentLogKey::new(instance, component_type, component)
402 }
403
404 #[tokio::test]
405 async fn test_log_and_get_history() {
406 let registry = ComponentLogRegistry::new();
407
408 let msg1 = LogMessage::with_instance(
409 LogLevel::Info,
410 "First message",
411 "instance1",
412 "source1",
413 ComponentType::Source,
414 );
415 let msg2 = LogMessage::with_instance(
416 LogLevel::Error,
417 "Second message",
418 "instance1",
419 "source1",
420 ComponentType::Source,
421 );
422
423 registry.log(msg1).await;
424 registry.log(msg2).await;
425
426 let key = make_key("instance1", ComponentType::Source, "source1");
427 let history = registry.get_history_by_key(&key).await;
428 assert_eq!(history.len(), 2);
429 assert_eq!(history[0].message, "First message");
430 assert_eq!(history[1].message, "Second message");
431 assert_eq!(history[1].level, LogLevel::Error);
432 }
433
434 #[tokio::test]
435 async fn test_max_history_limit() {
436 let registry = ComponentLogRegistry::with_capacity(3, 10);
437
438 for i in 0..5 {
439 let msg = LogMessage::with_instance(
440 LogLevel::Info,
441 format!("Message {i}"),
442 "instance1",
443 "source1",
444 ComponentType::Source,
445 );
446 registry.log(msg).await;
447 }
448
449 let key = make_key("instance1", ComponentType::Source, "source1");
450 let history = registry.get_history_by_key(&key).await;
451 assert_eq!(history.len(), 3);
452 assert_eq!(history[0].message, "Message 2");
454 assert_eq!(history[2].message, "Message 4");
455 }
456
457 #[tokio::test]
458 async fn test_subscribe_gets_history_and_live() {
459 let registry = Arc::new(ComponentLogRegistry::new());
460
461 let msg1 = LogMessage::with_instance(
463 LogLevel::Info,
464 "History 1",
465 "instance1",
466 "source1",
467 ComponentType::Source,
468 );
469 registry.log(msg1).await;
470
471 let key = make_key("instance1", ComponentType::Source, "source1");
473 let (history, mut receiver) = registry.subscribe_by_key(&key).await;
474 assert_eq!(history.len(), 1);
475 assert_eq!(history[0].message, "History 1");
476
477 let registry_clone = registry.clone();
479 tokio::spawn(async move {
480 tokio::task::yield_now().await;
481 let msg2 = LogMessage::with_instance(
482 LogLevel::Info,
483 "Live message",
484 "instance1",
485 "source1",
486 ComponentType::Source,
487 );
488 registry_clone.log(msg2).await;
489 });
490
491 let live_msg = receiver.recv().await.unwrap();
493 assert_eq!(live_msg.message, "Live message");
494 }
495
496 #[tokio::test]
497 async fn test_remove_component() {
498 let registry = ComponentLogRegistry::new();
499
500 let msg = LogMessage::with_instance(
501 LogLevel::Info,
502 "Test",
503 "instance1",
504 "source1",
505 ComponentType::Source,
506 );
507 registry.log(msg).await;
508
509 let key = make_key("instance1", ComponentType::Source, "source1");
510 assert_eq!(registry.log_count_by_key(&key).await, 1);
511
512 registry.remove_component_by_key(&key).await;
513
514 assert_eq!(registry.log_count_by_key(&key).await, 0);
515 }
516
517 #[tokio::test]
518 async fn test_multiple_components() {
519 let registry = ComponentLogRegistry::new();
520
521 let msg1 = LogMessage::with_instance(
522 LogLevel::Info,
523 "Source log",
524 "instance1",
525 "source1",
526 ComponentType::Source,
527 );
528 let msg2 = LogMessage::with_instance(
529 LogLevel::Info,
530 "Query log",
531 "instance1",
532 "query1",
533 ComponentType::Query,
534 );
535
536 registry.log(msg1).await;
537 registry.log(msg2).await;
538
539 let source_key = make_key("instance1", ComponentType::Source, "source1");
540 let query_key = make_key("instance1", ComponentType::Query, "query1");
541
542 let source_history = registry.get_history_by_key(&source_key).await;
543 let query_history = registry.get_history_by_key(&query_key).await;
544
545 assert_eq!(source_history.len(), 1);
546 assert_eq!(query_history.len(), 1);
547 assert_eq!(source_history[0].component_type, ComponentType::Source);
548 assert_eq!(query_history[0].component_type, ComponentType::Query);
549 }
550
551 #[tokio::test]
552 async fn test_instance_isolation() {
553 let registry = ComponentLogRegistry::new();
555
556 let msg1 = LogMessage::with_instance(
558 LogLevel::Info,
559 "Instance 1 log",
560 "instance1",
561 "my-source",
562 ComponentType::Source,
563 );
564 let msg2 = LogMessage::with_instance(
565 LogLevel::Info,
566 "Instance 2 log",
567 "instance2",
568 "my-source",
569 ComponentType::Source,
570 );
571
572 registry.log(msg1).await;
573 registry.log(msg2).await;
574
575 let key1 = make_key("instance1", ComponentType::Source, "my-source");
576 let key2 = make_key("instance2", ComponentType::Source, "my-source");
577
578 let history1 = registry.get_history_by_key(&key1).await;
579 let history2 = registry.get_history_by_key(&key2).await;
580
581 assert_eq!(history1.len(), 1);
583 assert_eq!(history2.len(), 1);
584 assert_eq!(history1[0].message, "Instance 1 log");
585 assert_eq!(history2[0].message, "Instance 2 log");
586 }
587
588 #[test]
589 fn test_component_log_key() {
590 let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
591 assert_eq!(key.to_string_key(), "my-instance:source:my-source");
592 assert_eq!(key.instance_id, "my-instance");
593 assert_eq!(key.component_type, ComponentType::Source);
594 assert_eq!(key.component_id, "my-source");
595 }
596
597 #[test]
598 fn test_log_level_ordering() {
599 assert!(LogLevel::Trace < LogLevel::Debug);
600 assert!(LogLevel::Debug < LogLevel::Info);
601 assert!(LogLevel::Info < LogLevel::Warn);
602 assert!(LogLevel::Warn < LogLevel::Error);
603 }
604
605 #[test]
606 fn test_log_level_display() {
607 assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
608 assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
609 assert_eq!(format!("{}", LogLevel::Info), "INFO");
610 assert_eq!(format!("{}", LogLevel::Warn), "WARN");
611 assert_eq!(format!("{}", LogLevel::Error), "ERROR");
612 }
613}