Skip to main content

drasi_lib/managers/
component_log.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Component logging infrastructure for live log streaming.
16//!
17//! This module provides the storage and broadcast infrastructure for component logs.
18//! Logs are captured by the tracing layer (`ComponentLogLayer`) and routed to
19//! per-component streams that can be subscribed to by clients.
20//!
21//! # Architecture
22//!
23//! - `ComponentLogRegistry`: Central registry that manages log channels and history
24//! - `LogMessage`: Structured log message with timestamp, level, and metadata
25//! - `LogLevel`: Log severity levels (Trace, Debug, Info, Warn, Error)
26//!
27//! # Usage
28//!
29//! Log capture is automatic when code runs within a tracing span that has
30//! `component_id` and `component_type` attributes:
31//!
32//! ```ignore
33//! use tracing::Instrument;
34//!
35//! let span = tracing::info_span!(
36//!     "source",
37//!     component_id = %source_id,
38//!     component_type = "source"
39//! );
40//!
41//! async {
42//!     // Both of these are captured:
43//!     tracing::info!("Starting source");
44//!     log::info!("Also captured via tracing-log bridge");
45//! }.instrument(span).await;
46//! ```
47//!
48//! Subscribers can stream logs from a component:
49//!
50//! ```ignore
51//! let (history, mut receiver) = core.subscribe_source_logs("my-source").await?;
52//! while let Ok(log) = receiver.recv().await {
53//!     println!("[{}] {}: {}", log.level, log.component_id, log.message);
54//! }
55//! ```
56
57use std::collections::{HashMap, VecDeque};
58use std::sync::Arc;
59
60use chrono::{DateTime, Utc};
61use serde::{Deserialize, Serialize};
62use tokio::sync::{broadcast, RwLock};
63
64use crate::channels::ComponentType;
65
66/// Default maximum number of log messages to retain per component.
67pub const DEFAULT_MAX_LOGS_PER_COMPONENT: usize = 100;
68
69/// Composite key for identifying a component's log channel.
70///
71/// This ensures logs from different DrasiLib instances with the same component ID
72/// are kept separate.
73#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct ComponentLogKey {
75    /// The DrasiLib instance ID (or empty string for legacy/default behavior)
76    pub instance_id: String,
77    /// The type of component (Source, Query, Reaction)
78    pub component_type: ComponentType,
79    /// The component's ID within its DrasiLib instance
80    pub component_id: String,
81}
82
83impl ComponentLogKey {
84    /// Create a new composite key.
85    pub fn new(
86        instance_id: impl Into<String>,
87        component_type: ComponentType,
88        component_id: impl Into<String>,
89    ) -> Self {
90        Self {
91            instance_id: instance_id.into(),
92            component_type,
93            component_id: component_id.into(),
94        }
95    }
96
97    /// Create a key from string representation (for backwards compatibility).
98    /// Format: "instance_id:component_type:component_id" or just "component_id" for legacy.
99    pub fn from_str_key(key: &str) -> Option<Self> {
100        let parts: Vec<&str> = key.split(':').collect();
101        match parts.len() {
102            1 => None, // Legacy single-part key, can't reconstruct
103            3 => {
104                let component_type = match parts[1].to_lowercase().as_str() {
105                    "source" => ComponentType::Source,
106                    "query" => ComponentType::Query,
107                    "reaction" => ComponentType::Reaction,
108                    _ => return None,
109                };
110                Some(Self {
111                    instance_id: parts[0].to_string(),
112                    component_type,
113                    component_id: parts[2].to_string(),
114                })
115            }
116            _ => None,
117        }
118    }
119
120    /// Convert to string key for HashMap storage.
121    pub fn to_string_key(&self) -> String {
122        let type_str = match self.component_type {
123            ComponentType::Source => "source",
124            ComponentType::Query => "query",
125            ComponentType::Reaction => "reaction",
126        };
127        format!("{}:{}:{}", self.instance_id, type_str, self.component_id)
128    }
129}
130
131impl std::fmt::Display for ComponentLogKey {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(f, "{}", self.to_string_key())
134    }
135}
136
137/// Default broadcast channel capacity for live log streaming.
138pub const DEFAULT_LOG_CHANNEL_CAPACITY: usize = 256;
139
140/// Log severity level.
141///
142/// Follows standard log level conventions, from most verbose (Trace) to
143/// least verbose (Error).
144#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
145pub enum LogLevel {
146    /// Very detailed tracing information
147    Trace,
148    /// Debugging information
149    Debug,
150    /// General informational messages
151    Info,
152    /// Warning messages
153    Warn,
154    /// Error messages
155    Error,
156}
157
158impl std::fmt::Display for LogLevel {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            LogLevel::Trace => write!(f, "TRACE"),
162            LogLevel::Debug => write!(f, "DEBUG"),
163            LogLevel::Info => write!(f, "INFO"),
164            LogLevel::Warn => write!(f, "WARN"),
165            LogLevel::Error => write!(f, "ERROR"),
166        }
167    }
168}
169
170/// A structured log message from a component.
171///
172/// Contains the log content along with metadata about when and where
173/// the log was generated.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct LogMessage {
176    /// Timestamp when the log was emitted
177    pub timestamp: DateTime<Utc>,
178    /// Severity level of the log
179    pub level: LogLevel,
180    /// The log message content
181    pub message: String,
182    /// ID of the DrasiLib instance that owns the component
183    pub instance_id: String,
184    /// ID of the component that emitted the log
185    pub component_id: String,
186    /// Type of the component (Source, Query, Reaction)
187    pub component_type: ComponentType,
188}
189
190impl LogMessage {
191    /// Create a new log message with the current timestamp.
192    pub fn new(
193        level: LogLevel,
194        message: impl Into<String>,
195        component_id: impl Into<String>,
196        component_type: ComponentType,
197    ) -> Self {
198        Self::with_instance(level, message, "", component_id, component_type)
199    }
200
201    /// Create a new log message with instance ID.
202    pub fn with_instance(
203        level: LogLevel,
204        message: impl Into<String>,
205        instance_id: impl Into<String>,
206        component_id: impl Into<String>,
207        component_type: ComponentType,
208    ) -> Self {
209        Self {
210            timestamp: Utc::now(),
211            level,
212            message: message.into(),
213            instance_id: instance_id.into(),
214            component_id: component_id.into(),
215            component_type,
216        }
217    }
218
219    /// Get the composite key for this log message.
220    pub fn key(&self) -> ComponentLogKey {
221        ComponentLogKey::new(
222            self.instance_id.clone(),
223            self.component_type.clone(),
224            self.component_id.clone(),
225        )
226    }
227}
228
229/// Per-component log storage and broadcast channel.
230struct ComponentLogChannel {
231    /// Recent log history
232    history: VecDeque<LogMessage>,
233    /// Maximum history size
234    max_history: usize,
235    /// Broadcast sender for live streaming
236    sender: broadcast::Sender<LogMessage>,
237}
238
239impl ComponentLogChannel {
240    fn new(max_history: usize, channel_capacity: usize) -> Self {
241        let (sender, _) = broadcast::channel(channel_capacity);
242        Self {
243            history: VecDeque::with_capacity(max_history),
244            max_history,
245            sender,
246        }
247    }
248
249    fn log(&mut self, message: LogMessage) {
250        // Add to history
251        if self.history.len() >= self.max_history {
252            self.history.pop_front();
253        }
254        self.history.push_back(message.clone());
255
256        // Broadcast to live subscribers (ignore if no subscribers)
257        let _ = self.sender.send(message);
258    }
259
260    fn get_history(&self) -> Vec<LogMessage> {
261        self.history.iter().cloned().collect()
262    }
263
264    fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
265        self.sender.subscribe()
266    }
267}
268
269/// Central registry for component log channels.
270///
271/// Manages per-component log storage and broadcast channels for live streaming.
272/// This is typically owned by `DrasiLib` and shared across all managers.
273pub struct ComponentLogRegistry {
274    /// Log channels per component ID
275    channels: RwLock<HashMap<String, ComponentLogChannel>>,
276    /// Maximum log history per component
277    max_history: usize,
278    /// Broadcast channel capacity
279    channel_capacity: usize,
280}
281
282impl std::fmt::Debug for ComponentLogRegistry {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("ComponentLogRegistry")
285            .field("max_history", &self.max_history)
286            .field("channel_capacity", &self.channel_capacity)
287            .finish()
288    }
289}
290
291impl Default for ComponentLogRegistry {
292    fn default() -> Self {
293        Self::new()
294    }
295}
296
297impl ComponentLogRegistry {
298    /// Create a new registry with default settings.
299    pub fn new() -> Self {
300        Self {
301            channels: RwLock::new(HashMap::new()),
302            max_history: DEFAULT_MAX_LOGS_PER_COMPONENT,
303            channel_capacity: DEFAULT_LOG_CHANNEL_CAPACITY,
304        }
305    }
306
307    /// Create a new registry with custom settings.
308    pub fn with_capacity(max_history: usize, channel_capacity: usize) -> Self {
309        Self {
310            channels: RwLock::new(HashMap::new()),
311            max_history,
312            channel_capacity,
313        }
314    }
315
316    /// Log a message for a component.
317    ///
318    /// Creates the component's channel if it doesn't exist.
319    /// Uses composite key (instance_id:component_type:component_id) for storage.
320    pub async fn log(&self, message: LogMessage) {
321        let key = message.key().to_string_key();
322        let mut channels = self.channels.write().await;
323        let channel = channels
324            .entry(key)
325            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
326        channel.log(message);
327    }
328
329    /// Non-blocking log: tries to acquire the write lock without waiting.
330    ///
331    /// Returns `true` if the message was logged, `false` if the lock was held.
332    /// Use this from synchronous contexts (e.g., FFI callbacks) where blocking
333    /// on an async lock would stall the runtime scheduler.
334    pub fn try_log(&self, message: LogMessage) -> bool {
335        match self.channels.try_write() {
336            Ok(mut channels) => {
337                let key = message.key().to_string_key();
338                let channel = channels.entry(key).or_insert_with(|| {
339                    ComponentLogChannel::new(self.max_history, self.channel_capacity)
340                });
341                channel.log(message);
342                true
343            }
344            Err(_) => false,
345        }
346    }
347
348    /// Get the log history for a component using composite key.
349    ///
350    /// Returns an empty vector if the component has no logs.
351    pub async fn get_history_by_key(&self, key: &ComponentLogKey) -> Vec<LogMessage> {
352        let channels = self.channels.read().await;
353        channels
354            .get(&key.to_string_key())
355            .map(|c| c.get_history())
356            .unwrap_or_default()
357    }
358
359    /// Get the log history for a component (legacy API, uses empty instance_id).
360    ///
361    /// Returns an empty vector if the component has no logs.
362    #[deprecated(note = "Use get_history_by_key with ComponentLogKey for instance isolation")]
363    pub async fn get_history(&self, component_id: &str) -> Vec<LogMessage> {
364        let channels = self.channels.read().await;
365        channels
366            .get(component_id)
367            .map(|c| c.get_history())
368            .unwrap_or_default()
369    }
370
371    /// Subscribe to live logs for a component using composite key.
372    ///
373    /// Returns the current history and a broadcast receiver for new logs.
374    /// Creates the component's channel if it doesn't exist.
375    pub async fn subscribe_by_key(
376        &self,
377        key: &ComponentLogKey,
378    ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
379        let mut channels = self.channels.write().await;
380        let channel = channels
381            .entry(key.to_string_key())
382            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
383
384        let history = channel.get_history();
385        let receiver = channel.subscribe();
386        (history, receiver)
387    }
388
389    /// Subscribe to live logs for a component (legacy API).
390    ///
391    /// Returns the current history and a broadcast receiver for new logs.
392    /// Creates the component's channel if it doesn't exist.
393    #[deprecated(note = "Use subscribe_by_key with ComponentLogKey for instance isolation")]
394    pub async fn subscribe(
395        &self,
396        component_id: &str,
397    ) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
398        let mut channels = self.channels.write().await;
399        let channel = channels
400            .entry(component_id.to_string())
401            .or_insert_with(|| ComponentLogChannel::new(self.max_history, self.channel_capacity));
402
403        let history = channel.get_history();
404        let receiver = channel.subscribe();
405        (history, receiver)
406    }
407
408    /// Remove a component's log channel using composite key.
409    ///
410    /// Called when a component is deleted to clean up resources.
411    pub async fn remove_component_by_key(&self, key: &ComponentLogKey) {
412        self.channels.write().await.remove(&key.to_string_key());
413    }
414
415    /// Remove a component's log channel (legacy API).
416    ///
417    /// Called when a component is deleted to clean up resources.
418    #[deprecated(note = "Use remove_component_by_key with ComponentLogKey for instance isolation")]
419    pub async fn remove_component(&self, component_id: &str) {
420        self.channels.write().await.remove(component_id);
421    }
422
423    /// Get the number of log messages stored for a component using composite key.
424    pub async fn log_count_by_key(&self, key: &ComponentLogKey) -> usize {
425        self.channels
426            .read()
427            .await
428            .get(&key.to_string_key())
429            .map(|c| c.history.len())
430            .unwrap_or(0)
431    }
432
433    /// Get the number of log messages stored for a component (legacy API).
434    #[deprecated(note = "Use log_count_by_key with ComponentLogKey for instance isolation")]
435    pub async fn log_count(&self, component_id: &str) -> usize {
436        self.channels
437            .read()
438            .await
439            .get(component_id)
440            .map(|c| c.history.len())
441            .unwrap_or(0)
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use tokio::time::{sleep, Duration};
449
450    fn make_key(instance: &str, component_type: ComponentType, component: &str) -> ComponentLogKey {
451        ComponentLogKey::new(instance, component_type, component)
452    }
453
454    #[tokio::test]
455    async fn test_log_and_get_history() {
456        let registry = ComponentLogRegistry::new();
457
458        let msg1 = LogMessage::with_instance(
459            LogLevel::Info,
460            "First message",
461            "instance1",
462            "source1",
463            ComponentType::Source,
464        );
465        let msg2 = LogMessage::with_instance(
466            LogLevel::Error,
467            "Second message",
468            "instance1",
469            "source1",
470            ComponentType::Source,
471        );
472
473        registry.log(msg1).await;
474        registry.log(msg2).await;
475
476        let key = make_key("instance1", ComponentType::Source, "source1");
477        let history = registry.get_history_by_key(&key).await;
478        assert_eq!(history.len(), 2);
479        assert_eq!(history[0].message, "First message");
480        assert_eq!(history[1].message, "Second message");
481        assert_eq!(history[1].level, LogLevel::Error);
482    }
483
484    #[tokio::test]
485    async fn test_max_history_limit() {
486        let registry = ComponentLogRegistry::with_capacity(3, 10);
487
488        for i in 0..5 {
489            let msg = LogMessage::with_instance(
490                LogLevel::Info,
491                format!("Message {i}"),
492                "instance1",
493                "source1",
494                ComponentType::Source,
495            );
496            registry.log(msg).await;
497        }
498
499        let key = make_key("instance1", ComponentType::Source, "source1");
500        let history = registry.get_history_by_key(&key).await;
501        assert_eq!(history.len(), 3);
502        // Should have messages 2, 3, 4 (oldest removed)
503        assert_eq!(history[0].message, "Message 2");
504        assert_eq!(history[2].message, "Message 4");
505    }
506
507    #[tokio::test]
508    async fn test_subscribe_gets_history_and_live() {
509        let registry = Arc::new(ComponentLogRegistry::new());
510
511        // Log some history first
512        let msg1 = LogMessage::with_instance(
513            LogLevel::Info,
514            "History 1",
515            "instance1",
516            "source1",
517            ComponentType::Source,
518        );
519        registry.log(msg1).await;
520
521        // Subscribe
522        let key = make_key("instance1", ComponentType::Source, "source1");
523        let (history, mut receiver) = registry.subscribe_by_key(&key).await;
524        assert_eq!(history.len(), 1);
525        assert_eq!(history[0].message, "History 1");
526
527        // Log a new message after subscribing
528        let registry_clone = registry.clone();
529        tokio::spawn(async move {
530            sleep(Duration::from_millis(10)).await;
531            let msg2 = LogMessage::with_instance(
532                LogLevel::Info,
533                "Live message",
534                "instance1",
535                "source1",
536                ComponentType::Source,
537            );
538            registry_clone.log(msg2).await;
539        });
540
541        // Should receive the live message
542        let live_msg = receiver.recv().await.unwrap();
543        assert_eq!(live_msg.message, "Live message");
544    }
545
546    #[tokio::test]
547    async fn test_remove_component() {
548        let registry = ComponentLogRegistry::new();
549
550        let msg = LogMessage::with_instance(
551            LogLevel::Info,
552            "Test",
553            "instance1",
554            "source1",
555            ComponentType::Source,
556        );
557        registry.log(msg).await;
558
559        let key = make_key("instance1", ComponentType::Source, "source1");
560        assert_eq!(registry.log_count_by_key(&key).await, 1);
561
562        registry.remove_component_by_key(&key).await;
563
564        assert_eq!(registry.log_count_by_key(&key).await, 0);
565    }
566
567    #[tokio::test]
568    async fn test_multiple_components() {
569        let registry = ComponentLogRegistry::new();
570
571        let msg1 = LogMessage::with_instance(
572            LogLevel::Info,
573            "Source log",
574            "instance1",
575            "source1",
576            ComponentType::Source,
577        );
578        let msg2 = LogMessage::with_instance(
579            LogLevel::Info,
580            "Query log",
581            "instance1",
582            "query1",
583            ComponentType::Query,
584        );
585
586        registry.log(msg1).await;
587        registry.log(msg2).await;
588
589        let source_key = make_key("instance1", ComponentType::Source, "source1");
590        let query_key = make_key("instance1", ComponentType::Query, "query1");
591
592        let source_history = registry.get_history_by_key(&source_key).await;
593        let query_history = registry.get_history_by_key(&query_key).await;
594
595        assert_eq!(source_history.len(), 1);
596        assert_eq!(query_history.len(), 1);
597        assert_eq!(source_history[0].component_type, ComponentType::Source);
598        assert_eq!(query_history[0].component_type, ComponentType::Query);
599    }
600
601    #[tokio::test]
602    async fn test_instance_isolation() {
603        // Test that different instances with the same component ID are isolated
604        let registry = ComponentLogRegistry::new();
605
606        // Same component ID, different instances
607        let msg1 = LogMessage::with_instance(
608            LogLevel::Info,
609            "Instance 1 log",
610            "instance1",
611            "my-source",
612            ComponentType::Source,
613        );
614        let msg2 = LogMessage::with_instance(
615            LogLevel::Info,
616            "Instance 2 log",
617            "instance2",
618            "my-source",
619            ComponentType::Source,
620        );
621
622        registry.log(msg1).await;
623        registry.log(msg2).await;
624
625        let key1 = make_key("instance1", ComponentType::Source, "my-source");
626        let key2 = make_key("instance2", ComponentType::Source, "my-source");
627
628        let history1 = registry.get_history_by_key(&key1).await;
629        let history2 = registry.get_history_by_key(&key2).await;
630
631        // Each instance should only see its own logs
632        assert_eq!(history1.len(), 1);
633        assert_eq!(history2.len(), 1);
634        assert_eq!(history1[0].message, "Instance 1 log");
635        assert_eq!(history2[0].message, "Instance 2 log");
636    }
637
638    #[test]
639    fn test_component_log_key() {
640        let key = ComponentLogKey::new("my-instance", ComponentType::Source, "my-source");
641        assert_eq!(key.to_string_key(), "my-instance:source:my-source");
642        assert_eq!(key.instance_id, "my-instance");
643        assert_eq!(key.component_type, ComponentType::Source);
644        assert_eq!(key.component_id, "my-source");
645    }
646
647    #[test]
648    fn test_log_level_ordering() {
649        assert!(LogLevel::Trace < LogLevel::Debug);
650        assert!(LogLevel::Debug < LogLevel::Info);
651        assert!(LogLevel::Info < LogLevel::Warn);
652        assert!(LogLevel::Warn < LogLevel::Error);
653    }
654
655    #[test]
656    fn test_log_level_display() {
657        assert_eq!(format!("{}", LogLevel::Trace), "TRACE");
658        assert_eq!(format!("{}", LogLevel::Debug), "DEBUG");
659        assert_eq!(format!("{}", LogLevel::Info), "INFO");
660        assert_eq!(format!("{}", LogLevel::Warn), "WARN");
661        assert_eq!(format!("{}", LogLevel::Error), "ERROR");
662    }
663}