Skip to main content

drasi_lib/managers/
component_log.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component logging infrastructure for live log streaming.
16//!
17//! This module provides the storage and broadcast infrastructure for component logs.
18//! Logs are captured by the tracing layer (`ComponentLogLayer`) and routed to
19//! per-component streams that can be subscribed to by clients.
20//!
21//! # Architecture
22//!
23//! - `ComponentLogRegistry`: Central registry that manages log channels and history
24//! - `LogMessage`: Structured log message with timestamp, level, and metadata
25//! - `LogLevel`: Log severity levels (Trace, Debug, Info, Warn, Error)
26//!
27//! # Usage
28//!
29//! Log capture is automatic when code runs within a tracing span that has
30//! `component_id` and `component_type` attributes:
31//!
32//! ```ignore
33//! use tracing::Instrument;
34//!
35//! let span = tracing::info_span!(
36//!     "source",
37//!     component_id = %source_id,
38//!     component_type = "source"
39//! );
40//!
41//! async {
42//!     // Both of these are captured:
43//!     tracing::info!("Starting source");
44//!     log::info!("Also captured via tracing-log bridge");
45//! }.instrument(span).await;
46//! ```
47//!
48//! Subscribers can stream logs from a component:
49//!
50//! ```ignore
51//! let (history, mut receiver) = core.subscribe_source_logs("my-source").await?;
52//! while let Ok(log) = receiver.recv().await {
53//!     println!("[{}] {}: {}", log.level, log.component_id, log.message);
54//! }
55//! ```
56
57use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66/// Default maximum number of log messages to retain per component.
67pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69/// Composite key for identifying a component's log channel.
70///
71/// This ensures logs from different DrasiLib instances with the same component ID
72/// are kept separate.
73#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75    /// The DrasiLib instance ID (or empty string for legacy/default behavior)
76    pub instance_id: String,
77    /// The type of component (Source, Query, Reaction)
78    pub component_type: ComponentType,
79    /// The component's ID within its DrasiLib instance
80    pub component_id: String,
81}
82
83impl ComponentLogKey {
84    /// Create a new composite key.
85    pub fn new(
86        instance_id: impl Into<String>,
87        component_type: ComponentType,
88        component_id: impl Into<String>,
89    ) -> Self {
90        Self {
91            instance_id: instance_id.into(),
92            component_type,
93            component_id: component_id.into(),
94        }
95    }
96
97    /// Create a key from string representation (for backwards compatibility).
98    /// Format: "instance_id:component_type:component_id" or just "component_id" for legacy.
99    pub fn from_str_key(key: &str) -> Option<Self> {
100        let parts: Vec<&str> = key.split(':').collect();
101        match parts.len() {
102            1 => None, // Legacy single-part key, can't reconstruct
103            3 => {
104                let component_type = match parts[1].to_lowercase().as_str() {
105                    "source" => ComponentType::Source,
106                    "query" => ComponentType::Query,
107                    "reaction" => ComponentType::Reaction,
108                    _ => return None,
109                };
110                Some(Self {
111                    instance_id: parts[0].to_string(),
112                    component_type,
113                    component_id: parts[2].to_string(),
114                })
115            }
116            _ => None,
117        }
118    }
119
120    /// Convert to string key for HashMap storage.
121    pub fn to_string_key(&self) -> String {
122        let type_str = match self.component_type {
123            ComponentType::Source => "source",
124            ComponentType::Query => "query",
125            ComponentType::Reaction => "reaction",
126        };
127        format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
128    }
129}
130
131impl std::fmt::Display for ComponentLogKey {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(f, "{}", self.to_string_key())
134    }
135}
136
137/// Default broadcast channel capacity for live log streaming.
138pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
139
140/// Log severity level.
141///
142/// Follows standard log level conventions, from most verbose (Trace) to
143/// least verbose (Error).
144#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
145pub enum LogLevel {
146    /// Very detailed tracing information
147    Trace,
148    /// Debugging information
149    Debug,
150    /// General informational messages
151    Info,
152    /// Warning messages
153    Warn,
154    /// Error messages
155    Error,
156}
157
158impl std::fmt::Display for LogLevel {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            LogLevel::Trace => write!(f, "TRACE"),
162            LogLevel::Debug => write!(f, "DEBUG"),
163            LogLevel::Info => write!(f, "INFO"),
164            LogLevel::Warn => write!(f, "WARN"),
165            LogLevel::Error => write!(f, "ERROR"),
166        }
167    }
168}
169
170/// A structured log message from a component.
171///
172/// Contains the log content along with metadata about when and where
173/// the log was generated.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct LogMessage {
176    /// Timestamp when the log was emitted
177    pub timestamp: DateTime<Utc>,
178    /// Severity level of the log
179    pub level: LogLevel,
180    /// The log message content
181    pub message: String,
182    /// ID of the DrasiLib instance that owns the component
183    pub instance_id: String,
184    /// ID of the component that emitted the log
185    pub component_id: String,
186    /// Type of the component (Source, Query, Reaction)
187    pub component_type: ComponentType,
188}
189
190impl LogMessage {
191    /// Create a new log message with the current timestamp.
192    pub fn new(
193        level: LogLevel,
194        message: impl Into<String>,
195        component_id: impl Into<String>,
196        component_type: ComponentType,
197    ) -> Self {
198        Self::with_instance(level, message, "", component_id, component_type)
199    }
200
201    /// Create a new log message with instance ID.
202    pub fn with_instance(
203        level: LogLevel,
204        message: impl Into<String>,
205        instance_id: impl Into<String>,
206        component_id: impl Into<String>,
207        component_type: ComponentType,
208    ) -> Self {
209        Self {
210            timestamp: Utc::now(),
211            level,
212            message: message.into(),
213            instance_id: instance_id.into(),
214            component_id: component_id.into(),
215            component_type,
216        }
217    }
218
219    /// Get the composite key for this log message.
220    pub fn key(&self) -> ComponentLogKey {
221        ComponentLogKey::new(
222            self.instance_id.clone(),
223            self.component_type.clone(),
224            self.component_id.clone(),
225        )
226    }
227}
228
229/// Per-component log storage and broadcast channel.
230struct ComponentLogChannel {
231    /// Recent log history
232    history: VecDeque<LogMessage>,
233    /// Maximum history size
234    max_history: usize,
235    /// Broadcast sender for live streaming
236    sender: broadcast::Sender<LogMessage>,
237}
238
239impl ComponentLogChannel {
240    fn new(max_history: usize, channel_capacity: usize) -> Self {
241        let (sender, _) = broadcast::channel(channel_capacity);
242        Self {
243            history: VecDeque::with_capacity(max_history),
244            max_history,
245            sender,
246        }
247    }
248
249    fn log(&mut self, message: LogMessage) {
250        // Add to history
251        if self.history.len() >= self.max_history {
252            self.history.pop_front();
253        }
254        self.history.push_back(message.clone());
255
256        // Broadcast to live subscribers (ignore if no subscribers)
257        let _ = self.sender.send(message);
258    }
259
260    fn get_history(&self) -> Vec<LogMessage> {
261        self.history.iter().cloned().collect()
262    }
263
264    fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
265        self.sender.subscribe()
266    }
267}
268
269/// Central registry for component log channels.
270///
271/// Manages per-component log storage and broadcast channels for live streaming.
272/// This is typically owned by `DrasiLib` and shared across all managers.
273pub struct ComponentLogRegistry {
274    /// Log channels per component ID
275    channels: RwLock<HashMap<String, ComponentLogChannel>>,
276    /// Maximum log history per component
277    max_history: usize,
278    /// Broadcast channel capacity
279    channel_capacity: usize,
280}
281
282impl std::fmt::Debug for ComponentLogRegistry {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("ComponentLogRegistry")
285            .field("max_history", &self.max_history)
286            .field("channel_capacity", &self.channel_capacity)
287            .finish()
288    }
289}
290
291impl Default for ComponentLogRegistry {
292    fn default() -> Self {
293        Self::new()
294    }
295}
296
297impl ComponentLogRegistry {
298    /// Create a new registry with default settings.
299    pub fn new() -> Self {
300        Self {
301            channels: RwLock::new(HashMap::new()),
302            max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
303            channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
304        }
305    }
306
307    /// Create a new registry with custom settings.
308    pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
309        Self {
310            channels: RwLock::new(HashMap::new()),
311            max_history,
312            channel_capacity,
313        }
314    }
315
316    /// Log a message for a component.
317    ///
318    /// Creates the component's channel if it doesn't exist.
319    /// Uses composite key (instance_id:component_type:component_id) for storage.
320    pub async fn log(&self, message: LogMessage) {
321        let key = message.key().to_string_key();
322        let mut channels = self.channels.write().await;
323        let channel = channels
324            .entry(key)
325            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
326        channel.log(message);
327    }
328
329    /// Get the log history for a component using composite key.
330    ///
331    /// Returns an empty vector if the component has no logs.
332    pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
333        let channels = self.channels.read().await;
334        channels
335            .get(&key.to_string_key())
336            .map(|c| c.get_history())
337            .unwrap_or_default()
338    }
339
340    /// Get the log history for a component (legacy API, uses empty instance_id).
341    ///
342    /// Returns an empty vector if the component has no logs.
343    #[deprecated(note = "Use get_history_by_key with ComponentLogKey for instance isolation")]
344    pub async fn get_history(&self, component_id: &str) -> Vec<LogMessage> {
345        let channels = self.channels.read().await;
346        channels
347            .get(component_id)
348            .map(|c| c.get_history())
349            .unwrap_or_default()
350    }
351
352    /// Subscribe to live logs for a component using composite key.
353    ///
354    /// Returns the current history and a broadcast receiver for new logs.
355    /// Creates the component's channel if it doesn't exist.
356    pub async fn subscribe_by_key(
357        &self,
358        key: &ComponentLogKey,
359    ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
360        let mut channels = self.channels.write().await;
361        let channel = channels
362            .entry(key.to_string_key())
363            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
364
365        let history = channel.get_history();
366        let receiver = channel.subscribe();
367        (history, receiver)
368    }
369
370    /// Subscribe to live logs for a component (legacy API).
371    ///
372    /// Returns the current history and a broadcast receiver for new logs.
373    /// Creates the component's channel if it doesn't exist.
374    #[deprecated(note = "Use subscribe_by_key with ComponentLogKey for instance isolation")]
375    pub async fn subscribe(
376        &self,
377        component_id: &str,
378    ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
379        let mut channels = self.channels.write().await;
380        let channel = channels
381            .entry(component_id.to_string())
382            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
383
384        let history = channel.get_history();
385        let receiver = channel.subscribe();
386        (history, receiver)
387    }
388
389    /// Remove a component's log channel using composite key.
390    ///
391    /// Called when a component is deleted to clean up resources.
392    pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
393        self.channels.write().await.remove(&key.to_string_key());
394    }
395
396    /// Remove a component's log channel (legacy API).
397    ///
398    /// Called when a component is deleted to clean up resources.
399    #[deprecated(note = "Use remove_component_by_key with ComponentLogKey for instance isolation")]
400    pub async fn remove_component(&self, component_id: &str) {
401        self.channels.write().await.remove(component_id);
402    }
403
404    /// Get the number of log messages stored for a component using composite key.
405    pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
406        self.channels
407            .read()
408            .await
409            .get(&key.to_string_key())
410            .map(|c| c.history.len())
411            .unwrap_or(0)
412    }
413
414    /// Get the number of log messages stored for a component (legacy API).
415    #[deprecated(note = "Use log_count_by_key with ComponentLogKey for instance isolation")]
416    pub async fn log_count(&self, component_id: &str) -> usize {
417        self.channels
418            .read()
419            .await
420            .get(component_id)
421            .map(|c| c.history.len())
422            .unwrap_or(0)
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use tokio::time::{sleep, Duration};
430
431    fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
432        ComponentLogKey::new(instance, component_type, component)
433    }
434
435    #[tokio::test]
436    async fn test_log_and_get_history() {
437        let registry = ComponentLogRegistry::new();
438
439        let msg1 = LogMessage::with_instance(
440            LogLevel::Info,
441            "First message",
442            "instance1",
443            "source1",
444            ComponentType::Source,
445        );
446        let msg2 = LogMessage::with_instance(
447            LogLevel::Error,
448            "Second message",
449            "instance1",
450            "source1",
451            ComponentType::Source,
452        );
453
454        registry.log(msg1).await;
455        registry.log(msg2).await;
456
457        let key = make_key("instance1", ComponentType::Source, "source1");
458        let history = registry.get_history_by_key(&key).await;
459        assert_eq!(history.len(), 2);
460        assert_eq!(history[0].message, "First message");
461        assert_eq!(history[1].message, "Second message");
462        assert_eq!(history[1].level, LogLevel::Error);
463    }
464
465    #[tokio::test]
466    async fn test_max_history_limit() {
467        let registry = ComponentLogRegistry::with_capacity(3, 10);
468
469        for i in 0..5 {
470            let msg = LogMessage::with_instance(
471                LogLevel::Info,
472                format!("Message {i}"),
473                "instance1",
474                "source1",
475                ComponentType::Source,
476            );
477            registry.log(msg).await;
478        }
479
480        let key = make_key("instance1", ComponentType::Source, "source1");
481        let history = registry.get_history_by_key(&key).await;
482        assert_eq!(history.len(), 3);
483        // Should have messages 2, 3, 4 (oldest removed)
484        assert_eq!(history[0].message, "Message 2");
485        assert_eq!(history[2].message, "Message 4");
486    }
487
488    #[tokio::test]
489    async fn test_subscribe_gets_history_and_live() {
490        let registry = Arc::new(ComponentLogRegistry::new());
491
492        // Log some history first
493        let msg1 = LogMessage::with_instance(
494            LogLevel::Info,
495            "History 1",
496            "instance1",
497            "source1",
498            ComponentType::Source,
499        );
500        registry.log(msg1).await;
501
502        // Subscribe
503        let key = make_key("instance1", ComponentType::Source, "source1");
504        let (history, mut receiver) = registry.subscribe_by_key(&key).await;
505        assert_eq!(history.len(), 1);
506        assert_eq!(history[0].message, "History 1");
507
508        // Log a new message after subscribing
509        let registry_clone = registry.clone();
510        tokio::spawn(async move {
511            sleep(Duration::from_millis(10)).await;
512            let msg2 = LogMessage::with_instance(
513                LogLevel::Info,
514                "Live message",
515                "instance1",
516                "source1",
517                ComponentType::Source,
518            );
519            registry_clone.log(msg2).await;
520        });
521
522        // Should receive the live message
523        let live_msg = receiver.recv().await.unwrap();
524        assert_eq!(live_msg.message, "Live message");
525    }
526
527    #[tokio::test]
528    async fn test_remove_component() {
529        let registry = ComponentLogRegistry::new();
530
531        let msg = LogMessage::with_instance(
532            LogLevel::Info,
533            "Test",
534            "instance1",
535            "source1",
536            ComponentType::Source,
537        );
538        registry.log(msg).await;
539
540        let key = make_key("instance1", ComponentType::Source, "source1");
541        assert_eq!(registry.log_count_by_key(&key).await, 1);
542
543        registry.remove_component_by_key(&key).await;
544
545        assert_eq!(registry.log_count_by_key(&key).await, 0);
546    }
547
548    #[tokio::test]
549    async fn test_multiple_components() {
550        let registry = ComponentLogRegistry::new();
551
552        let msg1 = LogMessage::with_instance(
553            LogLevel::Info,
554            "Source log",
555            "instance1",
556            "source1",
557            ComponentType::Source,
558        );
559        let msg2 = LogMessage::with_instance(
560            LogLevel::Info,
561            "Query log",
562            "instance1",
563            "query1",
564            ComponentType::Query,
565        );
566
567        registry.log(msg1).await;
568        registry.log(msg2).await;
569
570        let source_key = make_key("instance1", ComponentType::Source, "source1");
571        let query_key = make_key("instance1", ComponentType::Query, "query1");
572
573        let source_history = registry.get_history_by_key(&source_key).await;
574        let query_history = registry.get_history_by_key(&query_key).await;
575
576        assert_eq!(source_history.len(), 1);
577        assert_eq!(query_history.len(), 1);
578        assert_eq!(source_history[0].component_type, ComponentType::Source);
579        assert_eq!(query_history[0].component_type, ComponentType::Query);
580    }
581
582    #[tokio::test]
583    async fn test_instance_isolation() {
584        // Test that different instances with the same component ID are isolated
585        let registry = ComponentLogRegistry::new();
586
587        // Same component ID, different instances
588        let msg1 = LogMessage::with_instance(
589            LogLevel::Info,
590            "Instance 1 log",
591            "instance1",
592            "my-source",
593            ComponentType::Source,
594        );
595        let msg2 = LogMessage::with_instance(
596            LogLevel::Info,
597            "Instance 2 log",
598            "instance2",
599            "my-source",
600            ComponentType::Source,
601        );
602
603        registry.log(msg1).await;
604        registry.log(msg2).await;
605
606        let key1 = make_key("instance1", ComponentType::Source, "my-source");
607        let key2 = make_key("instance2", ComponentType::Source, "my-source");
608
609        let history1 = registry.get_history_by_key(&key1).await;
610        let history2 = registry.get_history_by_key(&key2).await;
611
612        // Each instance should only see its own logs
613        assert_eq!(history1.len(), 1);
614        assert_eq!(history2.len(), 1);
615        assert_eq!(history1[0].message, "Instance 1 log");
616        assert_eq!(history2[0].message, "Instance 2 log");
617    }
618
619    #[test]
620    fn test_component_log_key() {
621        let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
622        assert_eq!(key.to_string_key(), "my-instance:source:my-source");
623        assert_eq!(key.instance_id, "my-instance");
624        assert_eq!(key.component_type, ComponentType::Source);
625        assert_eq!(key.component_id, "my-source");
626    }
627
628    #[test]
629    fn test_log_level_ordering() {
630        assert!(LogLevel::Trace < LogLevel::Debug);
631        assert!(LogLevel::Debug < LogLevel::Info);
632        assert!(LogLevel::Info < LogLevel::Warn);
633        assert!(LogLevel::Warn < LogLevel::Error);
634    }
635
636    #[test]
637    fn test_log_level_display() {
638        assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
639        assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
640        assert_eq!(format!("{}", LogLevel::Info), "INFO");
641        assert_eq!(format!("{}", LogLevel::Warn), "WARN");
642        assert_eq!(format!("{}", LogLevel::Error), "ERROR");
643    }
644}