Skip to main content

drasi_lib/managers/
component_log.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component logging infrastructure for live log streaming.
16//!
17//! This module provides the storage and broadcast infrastructure for component logs.
18//! Logs are captured by the tracing layer (`ComponentLogLayer`) and routed to
19//! per-component streams that can be subscribed to by clients.
20//!
21//! # Architecture
22//!
23//! - `ComponentLogRegistry`: Central registry that manages log channels and history
24//! - `LogMessage`: Structured log message with timestamp, level, and metadata
25//! - `LogLevel`: Log severity levels (Trace, Debug, Info, Warn, Error)
26//!
27//! # Usage
28//!
29//! Log capture is automatic when code runs within a tracing span that has
30//! `component_id` and `component_type` attributes:
31//!
32//! ```ignore
33//! use tracing::Instrument;
34//!
35//! let span = tracing::info_span!(
36//!     "source",
37//!     component_id = %source_id,
38//!     component_type = "source"
39//! );
40//!
41//! async {
42//!     // Both of these are captured:
43//!     tracing::info!("Starting source");
44//!     log::info!("Also captured via tracing-log bridge");
45//! }.instrument(span).await;
46//! ```
47//!
48//! Subscribers can stream logs from a component:
49//!
50//! ```ignore
51//! let (history, mut receiver) = core.subscribe_source_logs("my-source").await?;
52//! while let Ok(log) = receiver.recv().await {
53//!     println!("[{}] {}: {}", log.level, log.component_id, log.message);
54//! }
55//! ```
56
57use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66/// Default maximum number of log messages to retain per component.
67pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69/// Composite key for identifying a component's log channel.
70///
71/// This ensures logs from different DrasiLib instances with the same component ID
72/// are kept separate.
73#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75    /// The DrasiLib instance ID (or empty string for legacy/default behavior)
76    pub instance_id: String,
77    /// The type of component (Source, Query, Reaction)
78    pub component_type: ComponentType,
79    /// The component's ID within its DrasiLib instance
80    pub component_id: String,
81}
82
83impl ComponentLogKey {
84    /// Create a new composite key.
85    pub fn new(
86        instance_id: impl Into<String>,
87        component_type: ComponentType,
88        component_id: impl Into<String>,
89    ) -> Self {
90        Self {
91            instance_id: instance_id.into(),
92            component_type,
93            component_id: component_id.into(),
94        }
95    }
96
97    /// Create a key from string representation (for backwards compatibility).
98    /// Format: "instance_id:component_type:component_id" or just "component_id" for legacy.
99    pub fn from_str_key(key: &str) -> Option<Self> {
100        let parts: Vec<&str> = key.split(':').collect();
101        match parts.len() {
102            1 => None, // Legacy single-part key, can't reconstruct
103            3 => {
104                let component_type = match parts[1].to_lowercase().as_str() {
105                    "source" => ComponentType::Source,
106                    "query" => ComponentType::Query,
107                    "reaction" => ComponentType::Reaction,
108                    _ => return None,
109                };
110                Some(Self {
111                    instance_id: parts[0].to_string(),
112                    component_type,
113                    component_id: parts[2].to_string(),
114                })
115            }
116            _ => None,
117        }
118    }
119
120    /// Convert to string key for HashMap storage.
121    pub fn to_string_key(&self) -> String {
122        let type_str = match self.component_type {
123            ComponentType::Source => "source",
124            ComponentType::Query => "query",
125            ComponentType::Reaction => "reaction",
126            ComponentType::BootstrapProvider => "bootstrap_provider",
127            ComponentType::IdentityProvider => "identity_provider",
128        };
129        format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
130    }
131}
132
133impl std::fmt::Display for ComponentLogKey {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        write!(f, "{}", self.to_string_key())
136    }
137}
138
139/// Default broadcast channel capacity for live log streaming.
140pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
141
142/// Log severity level.
143///
144/// Follows standard log level conventions, from most verbose (Trace) to
145/// least verbose (Error).
146#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum LogLevel {
148    /// Very detailed tracing information
149    Trace,
150    /// Debugging information
151    Debug,
152    /// General informational messages
153    Info,
154    /// Warning messages
155    Warn,
156    /// Error messages
157    Error,
158}
159
160impl std::fmt::Display for LogLevel {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            LogLevel::Trace => write!(f, "TRACE"),
164            LogLevel::Debug => write!(f, "DEBUG"),
165            LogLevel::Info => write!(f, "INFO"),
166            LogLevel::Warn => write!(f, "WARN"),
167            LogLevel::Error => write!(f, "ERROR"),
168        }
169    }
170}
171
172/// A structured log message from a component.
173///
174/// Contains the log content along with metadata about when and where
175/// the log was generated.
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct LogMessage {
178    /// Timestamp when the log was emitted
179    pub timestamp: DateTime<Utc>,
180    /// Severity level of the log
181    pub level: LogLevel,
182    /// The log message content
183    pub message: String,
184    /// ID of the DrasiLib instance that owns the component
185    pub instance_id: String,
186    /// ID of the component that emitted the log
187    pub component_id: String,
188    /// Type of the component (Source, Query, Reaction)
189    pub component_type: ComponentType,
190}
191
192impl LogMessage {
193    /// Create a new log message with the current timestamp.
194    pub fn new(
195        level: LogLevel,
196        message: impl Into<String>,
197        component_id: impl Into<String>,
198        component_type: ComponentType,
199    ) -> Self {
200        Self::with_instance(level, message, "", component_id, component_type)
201    }
202
203    /// Create a new log message with instance ID.
204    pub fn with_instance(
205        level: LogLevel,
206        message: impl Into<String>,
207        instance_id: impl Into<String>,
208        component_id: impl Into<String>,
209        component_type: ComponentType,
210    ) -> Self {
211        Self {
212            timestamp: Utc::now(),
213            level,
214            message: message.into(),
215            instance_id: instance_id.into(),
216            component_id: component_id.into(),
217            component_type,
218        }
219    }
220
221    /// Get the composite key for this log message.
222    pub fn key(&self) -> ComponentLogKey {
223        ComponentLogKey::new(
224            self.instance_id.clone(),
225            self.component_type.clone(),
226            self.component_id.clone(),
227        )
228    }
229}
230
231/// Per-component log storage and broadcast channel.
232struct ComponentLogChannel {
233    /// Recent log history
234    history: VecDeque<LogMessage>,
235    /// Maximum history size
236    max_history: usize,
237    /// Broadcast sender for live streaming
238    sender: broadcast::Sender<LogMessage>,
239}
240
241impl ComponentLogChannel {
242    fn new(max_history: usize, channel_capacity: usize) -> Self {
243        let (sender, _) = broadcast::channel(channel_capacity);
244        Self {
245            history: VecDeque::with_capacity(max_history),
246            max_history,
247            sender,
248        }
249    }
250
251    fn log(&mut self, message: LogMessage) {
252        // Add to history
253        if self.history.len() >= self.max_history {
254            self.history.pop_front();
255        }
256        self.history.push_back(message.clone());
257
258        // Broadcast to live subscribers (ignore if no subscribers)
259        let _ = self.sender.send(message);
260    }
261
262    fn get_history(&self) -> Vec<LogMessage> {
263        self.history.iter().cloned().collect()
264    }
265
266    fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
267        self.sender.subscribe()
268    }
269}
270
271/// Central registry for component log channels.
272///
273/// Manages per-component log storage and broadcast channels for live streaming.
274/// This is typically owned by `DrasiLib` and shared across all managers.
275pub struct ComponentLogRegistry {
276    /// Log channels per component ID
277    channels: RwLock<HashMap<String, ComponentLogChannel>>,
278    /// Maximum log history per component
279    max_history: usize,
280    /// Broadcast channel capacity
281    channel_capacity: usize,
282}
283
284impl std::fmt::Debug for ComponentLogRegistry {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("ComponentLogRegistry")
287            .field("max_history", &self.max_history)
288            .field("channel_capacity", &self.channel_capacity)
289            .finish()
290    }
291}
292
293impl Default for ComponentLogRegistry {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299impl ComponentLogRegistry {
300    /// Create a new registry with default settings.
301    pub fn new() -> Self {
302        Self {
303            channels: RwLock::new(HashMap::new()),
304            max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
305            channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
306        }
307    }
308
309    /// Create a new registry with custom settings.
310    pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
311        Self {
312            channels: RwLock::new(HashMap::new()),
313            max_history,
314            channel_capacity,
315        }
316    }
317
318    /// Log a message for a component.
319    ///
320    /// Creates the component's channel if it doesn't exist.
321    /// Uses composite key (instance_id:component_type:component_id) for storage.
322    pub async fn log(&self, message: LogMessage) {
323        let key = message.key().to_string_key();
324        let mut channels = self.channels.write().await;
325        let channel = channels
326            .entry(key)
327            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
328        channel.log(message);
329    }
330
331    /// Non-blocking log: tries to acquire the write lock without waiting.
332    ///
333    /// Returns `true` if the message was logged, `false` if the lock was held.
334    /// Use this from synchronous contexts (e.g., FFI callbacks) where blocking
335    /// on an async lock would stall the runtime scheduler.
336    pub fn try_log(&self, message: LogMessage) -> bool {
337        match self.channels.try_write() {
338            Ok(mut channels) => {
339                let key = message.key().to_string_key();
340                let channel = channels.entry(key).or_insert_with(|| {
341                    ComponentLogChannel::new(self.max_history, self.channel_capacity)
342                });
343                channel.log(message);
344                true
345            }
346            Err(_) => false,
347        }
348    }
349
350    /// Get the log history for a component using composite key.
351    ///
352    /// Returns an empty vector if the component has no logs.
353    pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
354        let channels = self.channels.read().await;
355        channels
356            .get(&key.to_string_key())
357            .map(|c| c.get_history())
358            .unwrap_or_default()
359    }
360
361    /// Subscribe to live logs for a component using composite key.
362    ///
363    /// Returns the current history and a broadcast receiver for new logs.
364    /// Creates the component's channel if it doesn't exist.
365    pub async fn subscribe_by_key(
366        &self,
367        key: &ComponentLogKey,
368    ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
369        let mut channels = self.channels.write().await;
370        let channel = channels
371            .entry(key.to_string_key())
372            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
373
374        let history = channel.get_history();
375        let receiver = channel.subscribe();
376        (history, receiver)
377    }
378
379    /// Remove a component's log channel using composite key.
380    ///
381    /// Called when a component is deleted to clean up resources.
382    pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
383        self.channels.write().await.remove(&key.to_string_key());
384    }
385
386    /// Get the number of log messages stored for a component using composite key.
387    pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
388        self.channels
389            .read()
390            .await
391            .get(&key.to_string_key())
392            .map(|c| c.history.len())
393            .unwrap_or(0)
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
401        ComponentLogKey::new(instance, component_type, component)
402    }
403
404    #[tokio::test]
405    async fn test_log_and_get_history() {
406        let registry = ComponentLogRegistry::new();
407
408        let msg1 = LogMessage::with_instance(
409            LogLevel::Info,
410            "First message",
411            "instance1",
412            "source1",
413            ComponentType::Source,
414        );
415        let msg2 = LogMessage::with_instance(
416            LogLevel::Error,
417            "Second message",
418            "instance1",
419            "source1",
420            ComponentType::Source,
421        );
422
423        registry.log(msg1).await;
424        registry.log(msg2).await;
425
426        let key = make_key("instance1", ComponentType::Source, "source1");
427        let history = registry.get_history_by_key(&key).await;
428        assert_eq!(history.len(), 2);
429        assert_eq!(history[0].message, "First message");
430        assert_eq!(history[1].message, "Second message");
431        assert_eq!(history[1].level, LogLevel::Error);
432    }
433
434    #[tokio::test]
435    async fn test_max_history_limit() {
436        let registry = ComponentLogRegistry::with_capacity(3, 10);
437
438        for i in 0..5 {
439            let msg = LogMessage::with_instance(
440                LogLevel::Info,
441                format!("Message {i}"),
442                "instance1",
443                "source1",
444                ComponentType::Source,
445            );
446            registry.log(msg).await;
447        }
448
449        let key = make_key("instance1", ComponentType::Source, "source1");
450        let history = registry.get_history_by_key(&key).await;
451        assert_eq!(history.len(), 3);
452        // Should have messages 2, 3, 4 (oldest removed)
453        assert_eq!(history[0].message, "Message 2");
454        assert_eq!(history[2].message, "Message 4");
455    }
456
457    #[tokio::test]
458    async fn test_subscribe_gets_history_and_live() {
459        let registry = Arc::new(ComponentLogRegistry::new());
460
461        // Log some history first
462        let msg1 = LogMessage::with_instance(
463            LogLevel::Info,
464            "History 1",
465            "instance1",
466            "source1",
467            ComponentType::Source,
468        );
469        registry.log(msg1).await;
470
471        // Subscribe
472        let key = make_key("instance1", ComponentType::Source, "source1");
473        let (history, mut receiver) = registry.subscribe_by_key(&key).await;
474        assert_eq!(history.len(), 1);
475        assert_eq!(history[0].message, "History 1");
476
477        // Log a new message after subscribing
478        let registry_clone = registry.clone();
479        tokio::spawn(async move {
480            tokio::task::yield_now().await;
481            let msg2 = LogMessage::with_instance(
482                LogLevel::Info,
483                "Live message",
484                "instance1",
485                "source1",
486                ComponentType::Source,
487            );
488            registry_clone.log(msg2).await;
489        });
490
491        // Should receive the live message
492        let live_msg = receiver.recv().await.unwrap();
493        assert_eq!(live_msg.message, "Live message");
494    }
495
496    #[tokio::test]
497    async fn test_remove_component() {
498        let registry = ComponentLogRegistry::new();
499
500        let msg = LogMessage::with_instance(
501            LogLevel::Info,
502            "Test",
503            "instance1",
504            "source1",
505            ComponentType::Source,
506        );
507        registry.log(msg).await;
508
509        let key = make_key("instance1", ComponentType::Source, "source1");
510        assert_eq!(registry.log_count_by_key(&key).await, 1);
511
512        registry.remove_component_by_key(&key).await;
513
514        assert_eq!(registry.log_count_by_key(&key).await, 0);
515    }
516
517    #[tokio::test]
518    async fn test_multiple_components() {
519        let registry = ComponentLogRegistry::new();
520
521        let msg1 = LogMessage::with_instance(
522            LogLevel::Info,
523            "Source log",
524            "instance1",
525            "source1",
526            ComponentType::Source,
527        );
528        let msg2 = LogMessage::with_instance(
529            LogLevel::Info,
530            "Query log",
531            "instance1",
532            "query1",
533            ComponentType::Query,
534        );
535
536        registry.log(msg1).await;
537        registry.log(msg2).await;
538
539        let source_key = make_key("instance1", ComponentType::Source, "source1");
540        let query_key = make_key("instance1", ComponentType::Query, "query1");
541
542        let source_history = registry.get_history_by_key(&source_key).await;
543        let query_history = registry.get_history_by_key(&query_key).await;
544
545        assert_eq!(source_history.len(), 1);
546        assert_eq!(query_history.len(), 1);
547        assert_eq!(source_history[0].component_type, ComponentType::Source);
548        assert_eq!(query_history[0].component_type, ComponentType::Query);
549    }
550
551    #[tokio::test]
552    async fn test_instance_isolation() {
553        // Test that different instances with the same component ID are isolated
554        let registry = ComponentLogRegistry::new();
555
556        // Same component ID, different instances
557        let msg1 = LogMessage::with_instance(
558            LogLevel::Info,
559            "Instance 1 log",
560            "instance1",
561            "my-source",
562            ComponentType::Source,
563        );
564        let msg2 = LogMessage::with_instance(
565            LogLevel::Info,
566            "Instance 2 log",
567            "instance2",
568            "my-source",
569            ComponentType::Source,
570        );
571
572        registry.log(msg1).await;
573        registry.log(msg2).await;
574
575        let key1 = make_key("instance1", ComponentType::Source, "my-source");
576        let key2 = make_key("instance2", ComponentType::Source, "my-source");
577
578        let history1 = registry.get_history_by_key(&key1).await;
579        let history2 = registry.get_history_by_key(&key2).await;
580
581        // Each instance should only see its own logs
582        assert_eq!(history1.len(), 1);
583        assert_eq!(history2.len(), 1);
584        assert_eq!(history1[0].message, "Instance 1 log");
585        assert_eq!(history2[0].message, "Instance 2 log");
586    }
587
588    #[test]
589    fn test_component_log_key() {
590        let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
591        assert_eq!(key.to_string_key(), "my-instance:source:my-source");
592        assert_eq!(key.instance_id, "my-instance");
593        assert_eq!(key.component_type, ComponentType::Source);
594        assert_eq!(key.component_id, "my-source");
595    }
596
597    #[test]
598    fn test_log_level_ordering() {
599        assert!(LogLevel::Trace < LogLevel::Debug);
600        assert!(LogLevel::Debug < LogLevel::Info);
601        assert!(LogLevel::Info < LogLevel::Warn);
602        assert!(LogLevel::Warn < LogLevel::Error);
603    }
604
605    #[test]
606    fn test_log_level_display() {
607        assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
608        assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
609        assert_eq!(format!("{}", LogLevel::Info), "INFO");
610        assert_eq!(format!("{}", LogLevel::Warn), "WARN");
611        assert_eq!(format!("{}", LogLevel::Error), "ERROR");
612    }
613}