Skip to main content

drasi_lib/managers/
tracing_layer.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tracing layer for routing logs to component-specific streams.
16//!
17//! This module provides a custom `tracing_subscriber::Layer` that captures log events
18//! and routes them to the appropriate component's log stream based on span context.
19//!
20//! # How It Works
21//!
22//! 1. Components create tracing spans with `component_id` and `component_type` attributes
23//! 2. Log events (from `tracing::info!()` or `log::info!()` via bridge) occur within these spans
24//! 3. `ComponentLogLayer` extracts the component info from the span hierarchy
25//! 4. Logs are routed synchronously to `ComponentLogRegistry` for storage and broadcast
26//!
27//! # Global Registry
28//!
29//! Since `tracing` uses a single global subscriber per process, we use a shared global
30//! `ComponentLogRegistry` that all `DrasiLib` instances can access. Call `get_or_init_global_registry()`
31//! to get the shared registry, which will be initialized on first use.
32//!
33//! # Example
34//!
35//! ```ignore
36//! use tracing::Instrument;
37//!
38//! // Create a span for the component
39//! let span = tracing::info_span!(
40//!     "source",
41//!     component_id = %source_id,
42//!     component_type = "source"
43//! );
44//!
45//! // Run code within the span - logs are automatically routed
46//! async {
47//!     tracing::info!("Starting source");
48//!     // or log::info!("Starting source"); - works via tracing-log bridge
49//! }.instrument(span).await;
50//! ```
51
52use std::sync::Arc;
53
54use tokio::sync::mpsc;
55use tracing::field::{Field, Visit};
56use tracing::{Event, Subscriber};
57use tracing_subscriber::layer::Context;
58use tracing_subscriber::registry::LookupSpan;
59use tracing_subscriber::Layer;
60
61use super::component_log::{ComponentLogRegistry, LogLevel, LogMessage};
62use crate::channels::ComponentType;
63
64use std::sync::OnceLock;
65
66/// Default capacity for the log message channel.
67/// This provides backpressure when logging volume is high.
68const LOG_CHANNEL_CAPACITY: usize = 10_000;
69
70/// Global log registry shared by all DrasiLib instances.
71/// Since tracing uses a single global subscriber, we need a single shared registry.
72static GLOBAL_LOG_REGISTRY: OnceLock<Arc<ComponentLogRegistry>> = OnceLock::new();
73
74/// Global sender for the log worker. Initialized alongside the registry.
75static GLOBAL_LOG_SENDER: OnceLock<mpsc::Sender<LogMessage>> = OnceLock::new();
76
77/// Get or initialize the shared global log registry.
78///
79/// This returns a shared registry that all DrasiLib instances use. The tracing
80/// subscriber is global (per-process), so all logs from all DrasiLib instances
81/// go to the same registry.
82///
83/// On first call, this initializes the tracing subscriber with the registry
84/// and spawns a background worker to process log messages.
85pub fn get_or_init_global_registry() -> Arc<ComponentLogRegistry> {
86    GLOBAL_LOG_REGISTRY
87        .get_or_init(|| {
88            let registry = Arc::new(ComponentLogRegistry::new());
89
90            // Create bounded channel for log messages
91            let (tx, rx) = mpsc::channel::<LogMessage>(LOG_CHANNEL_CAPACITY);
92
93            // Store sender globally for the tracing layer to use
94            let _ = GLOBAL_LOG_SENDER.set(tx);
95
96            // Spawn the log worker in a dedicated thread with its own runtime.
97            // This ensures the worker is independent of any caller's runtime.
98            spawn_log_worker(registry.clone(), rx);
99
100            // Initialize tracing subscriber
101            init_tracing_internal(registry.clone());
102
103            registry
104        })
105        .clone()
106}
107
108/// Spawn the background worker that processes log messages.
109///
110/// This worker drains the channel and writes logs to the registry.
111/// Uses a dedicated thread with its own tokio runtime to ensure
112/// independence from the caller's async context.
113fn spawn_log_worker(registry: Arc<ComponentLogRegistry>, mut rx: mpsc::Receiver<LogMessage>) {
114    std::thread::Builder::new()
115        .name("drasi-log-worker".to_string())
116        .spawn(move || {
117            let rt = tokio::runtime::Builder::new_current_thread()
118                .enable_all()
119                .build()
120                .expect("Failed to create log worker runtime");
121
122            rt.block_on(async move {
123                while let Some(message) = rx.recv().await {
124                    registry.log(message).await;
125                }
126            });
127        })
128        .expect("Failed to spawn log worker thread");
129}
130
131/// Initialize the tracing subscriber with component log routing.
132///
133/// This sets up:
134/// - `ComponentLogLayer` for routing logs to component-specific streams
135/// - `fmt::layer()` for console output
136/// - `EnvFilter` for level control via `RUST_LOG` environment variable
137/// - `tracing-log` bridge for `log` crate compatibility
138///
139/// # Arguments
140///
141/// * `log_registry` - The registry to route component logs to
142///
143/// # Example
144///
145/// ```ignore
146/// use drasi_lib::managers::ComponentLogRegistry;
147/// use std::sync::Arc;
148///
149/// let log_registry = Arc::new(ComponentLogRegistry::new());
150/// drasi_lib::init_tracing(log_registry.clone());
151///
152/// // Now both tracing::info!() and log::info!() work
153/// tracing::info!("Hello from tracing");
154/// log::info!("Hello from log crate");
155/// ```
156///
157/// # Note
158///
159/// If another `log` crate logger was initialized before calling this function,
160/// `log::info!()` calls will go to that logger instead. However, `tracing::info!()`
161/// calls will still be captured correctly.
162///
163/// # Deprecated
164///
165/// Prefer using `get_or_init_global_registry()` which handles initialization automatically.
166/// This function is kept for backward compatibility.
167pub fn init_tracing(log_registry: Arc<ComponentLogRegistry>) {
168    // Ensure global registry is initialized (which sets up the channel worker)
169    let _ = get_or_init_global_registry();
170
171    // If caller provided a different registry, warn them
172    // (can't actually use it since tracing subscriber is already set)
173    if !Arc::ptr_eq(&log_registry, &get_or_init_global_registry()) {
174        tracing::warn!(
175            "init_tracing called with custom registry, but global registry already initialized. \
176             The provided registry will be ignored. Use get_or_init_global_registry() instead."
177        );
178    }
179}
180
181/// Internal initialization - sets up tracing subscriber without channel/worker.
182fn init_tracing_internal(log_registry: Arc<ComponentLogRegistry>) {
183    use tracing_subscriber::prelude::*;
184    use tracing_subscriber::{fmt, EnvFilter};
185
186    // Try to install the log->tracing bridge
187    // Use init() which returns Result, ignore error if logger already set
188    let _ = tracing_log::LogTracer::init();
189
190    // Build the subscriber with our custom layer
191    // Use RUST_LOG if set, otherwise default to INFO level
192    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
193
194    let subscriber = tracing_subscriber::registry()
195        .with(filter)
196        .with(ComponentLogLayer::new(log_registry))
197        .with(fmt::layer().with_target(true).with_level(true));
198
199    // Try to set as the global subscriber
200    // Use try_init to handle case where subscriber is already set
201    let _ = tracing::subscriber::set_global_default(subscriber);
202}
203
204/// Try to initialize tracing, returning whether initialization succeeded.
205///
206/// Unlike `init_tracing()`, this returns `false` if a subscriber is already set,
207/// allowing callers to handle this case.
208///
209/// # Deprecated
210///
211/// Prefer using `get_or_init_global_registry()` which handles initialization automatically.
212pub fn try_init_tracing(log_registry: Arc<ComponentLogRegistry>) -> bool {
213    // Check if already initialized
214    if GLOBAL_LOG_REGISTRY.get().is_some() {
215        return false;
216    }
217
218    // Initialize via the standard path
219    let _ = get_or_init_global_registry();
220
221    // Warn if caller's registry differs
222    if !Arc::ptr_eq(&log_registry, &get_or_init_global_registry()) {
223        tracing::warn!(
224            "try_init_tracing called with custom registry, but initialization uses global registry. \
225             The provided registry will be ignored."
226        );
227    }
228
229    true
230}
231
232/// Tracing layer that routes log events to component-specific streams.
233///
234/// This layer intercepts all tracing events and checks if they occur within
235/// a span that has `component_id` and `component_type` attributes. If so,
236/// the log is routed to that component's log stream in the registry.
237pub struct ComponentLogLayer {
238    registry: Arc<ComponentLogRegistry>,
239}
240
241impl ComponentLogLayer {
242    /// Create a new layer with the given log registry.
243    pub fn new(registry: Arc<ComponentLogRegistry>) -> Self {
244        Self { registry }
245    }
246}
247
248impl<S> Layer<S> for ComponentLogLayer
249where
250    S: Subscriber + for<'a> LookupSpan<'a>,
251{
252    fn on_new_span(
253        &self,
254        attrs: &tracing::span::Attributes<'_>,
255        id: &tracing::span::Id,
256        ctx: Context<'_, S>,
257    ) {
258        // Extract component info from span attributes and cache in extensions
259        let mut visitor = ComponentInfoVisitor::default();
260        attrs.record(&mut visitor);
261
262        if let Some(info) = visitor.into_component_info() {
263            if let Some(span) = ctx.span(id) {
264                let mut extensions = span.extensions_mut();
265                extensions.insert(info);
266            }
267        }
268    }
269
270    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
271        // Try to find component context from current or parent spans
272        let component_info = ctx.event_span(event).and_then(|span| {
273            // Walk up the span tree to find component info
274            let mut current = Some(span);
275            while let Some(span_ref) = current {
276                if let Some(info) = extract_component_info(&span_ref) {
277                    return Some(info);
278                }
279                current = span_ref.parent();
280            }
281            None
282        });
283
284        // If we found component context, route the log
285        if let Some(info) = component_info {
286            let level = convert_level(*event.metadata().level());
287            let message = extract_message(event);
288
289            let log_message = LogMessage::with_instance(
290                level,
291                message,
292                info.instance_id,
293                info.component_id,
294                info.component_type,
295            );
296
297            // Send to the log worker via bounded channel
298            // This provides backpressure instead of spawning unbounded tasks
299            if let Some(sender) = GLOBAL_LOG_SENDER.get() {
300                // Use try_send to avoid blocking in the tracing layer
301                // If channel is full, log is dropped (better than OOM)
302                if sender.try_send(log_message).is_err() {
303                    // Channel full or closed - log still goes to console via fmt layer
304                    // Could add a metric here for monitoring dropped logs
305                }
306            }
307        }
308    }
309}
310
311/// Component info stored in span extensions.
312#[derive(Clone)]
313struct ComponentInfo {
314    instance_id: String,
315    component_id: String,
316    component_type: ComponentType,
317}
318
319/// Extract component info from a span's cached extensions.
320fn extract_component_info<S>(
321    span: &tracing_subscriber::registry::SpanRef<'_, S>,
322) -> Option<ComponentInfo>
323where
324    S: Subscriber + for<'a> LookupSpan<'a>,
325{
326    // Component info is cached in span extensions during on_new_span
327    let extensions = span.extensions();
328    extensions.get::<ComponentInfo>().cloned()
329}
330
331/// Visitor for extracting component info from span/event fields.
332#[derive(Default)]
333struct ComponentInfoVisitor {
334    instance_id: Option<String>,
335    component_id: Option<String>,
336    component_type: Option<String>,
337}
338
339impl Visit for ComponentInfoVisitor {
340    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
341        match field.name() {
342            "instance_id" => {
343                self.instance_id = Some(format!("{value:?}").trim_matches('"').to_string())
344            }
345            "component_id" => {
346                self.component_id = Some(format!("{value:?}").trim_matches('"').to_string())
347            }
348            "component_type" => {
349                self.component_type = Some(format!("{value:?}").trim_matches('"').to_string())
350            }
351            _ => {}
352        }
353    }
354
355    fn record_str(&mut self, field: &Field, value: &str) {
356        match field.name() {
357            "instance_id" => self.instance_id = Some(value.to_string()),
358            "component_id" => self.component_id = Some(value.to_string()),
359            "component_type" => self.component_type = Some(value.to_string()),
360            _ => {}
361        }
362    }
363}
364
365impl ComponentInfoVisitor {
366    fn into_component_info(self) -> Option<ComponentInfo> {
367        let component_id = self.component_id?;
368        let component_type = self
369            .component_type
370            .as_deref()
371            .and_then(parse_component_type)?;
372        Some(ComponentInfo {
373            instance_id: self.instance_id.unwrap_or_default(),
374            component_id,
375            component_type,
376        })
377    }
378}
379
380/// Parse a component type string into a ComponentType enum.
381fn parse_component_type(s: &str) -> Option<ComponentType> {
382    match s.to_lowercase().as_str() {
383        "source" => Some(ComponentType::Source),
384        "query" => Some(ComponentType::Query),
385        "reaction" => Some(ComponentType::Reaction),
386        _ => None,
387    }
388}
389
390/// Visitor for extracting the message from an event.
391#[derive(Default)]
392struct MessageVisitor {
393    message: Option<String>,
394    fields: Vec<String>,
395}
396
397impl Visit for MessageVisitor {
398    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
399        if field.name() == "message" {
400            self.message = Some(format!("{value:?}").trim_matches('"').to_string());
401        } else {
402            self.fields.push(format!("{}={value:?}", field.name()));
403        }
404    }
405
406    fn record_str(&mut self, field: &Field, value: &str) {
407        if field.name() == "message" {
408            self.message = Some(value.to_string());
409        } else {
410            self.fields.push(format!("{}={}", field.name(), value));
411        }
412    }
413}
414
415/// Extract the message from a tracing event.
416fn extract_message(event: &Event<'_>) -> String {
417    let mut visitor = MessageVisitor::default();
418    event.record(&mut visitor);
419
420    if let Some(msg) = visitor.message {
421        msg
422    } else if !visitor.fields.is_empty() {
423        visitor.fields.join(", ")
424    } else {
425        // Fallback: use the event metadata name
426        event.metadata().name().to_string()
427    }
428}
429
430/// Convert tracing Level to our LogLevel.
431fn convert_level(level: tracing::Level) -> LogLevel {
432    match level {
433        tracing::Level::ERROR => LogLevel::Error,
434        tracing::Level::WARN => LogLevel::Warn,
435        tracing::Level::INFO => LogLevel::Info,
436        tracing::Level::DEBUG => LogLevel::Debug,
437        tracing::Level::TRACE => LogLevel::Trace,
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444
445    #[test]
446    fn test_parse_component_type() {
447        assert_eq!(parse_component_type("source"), Some(ComponentType::Source));
448        assert_eq!(parse_component_type("Source"), Some(ComponentType::Source));
449        assert_eq!(parse_component_type("SOURCE"), Some(ComponentType::Source));
450        assert_eq!(parse_component_type("query"), Some(ComponentType::Query));
451        assert_eq!(
452            parse_component_type("reaction"),
453            Some(ComponentType::Reaction)
454        );
455        assert_eq!(parse_component_type("unknown"), None);
456    }
457
458    #[test]
459    fn test_convert_level() {
460        assert_eq!(convert_level(tracing::Level::ERROR), LogLevel::Error);
461        assert_eq!(convert_level(tracing::Level::WARN), LogLevel::Warn);
462        assert_eq!(convert_level(tracing::Level::INFO), LogLevel::Info);
463        assert_eq!(convert_level(tracing::Level::DEBUG), LogLevel::Debug);
464        assert_eq!(convert_level(tracing::Level::TRACE), LogLevel::Trace);
465    }
466}